azkaban-aplcache

Event reporting for azkaban events. (#1332) Summary: With

9/11/2017 10:41:26 PM

Changes

build.gradle 5(+4 -1)

Details

diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 3ecaf78..fe66d4f 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -116,6 +116,17 @@ public class Constants {
     public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
     public static final String PROJECT_TEMP_DIR = "project.temp.dir";
 
+    // Event reporting properties
+    public static final String AZKABAN_EVENT_REPORTING_CLASS_PARAM =
+        "azkaban.event.reporting.class";
+    public static final String AZKABAN_EVENT_REPORTING_ENABLED = "azkaban.event.reporting.enabled";
+    public static final String AZKABAN_EVENT_REPORTING_KAFKA_BROKERS =
+        "azkaban.event.reporting.kafka.brokers";
+    public static final String AZKABAN_EVENT_REPORTING_KAFKA_TOPIC =
+        "azkaban.event.reporting.kafka.topic";
+    public static final String AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL =
+        "azkaban.event.reporting.kafka.schema.registry.url";
+
     /*
      * The max number of artifacts retained per project.
      * Accepted Values:
diff --git a/azkaban-common/src/main/java/azkaban/event/Event.java b/azkaban-common/src/main/java/azkaban/event/Event.java
index fc79b4a..832dd99 100644
--- a/azkaban-common/src/main/java/azkaban/event/Event.java
+++ b/azkaban-common/src/main/java/azkaban/event/Event.java
@@ -16,16 +16,17 @@
 
 package azkaban.event;
 
+import azkaban.spi.EventType;
 import com.google.common.base.Preconditions;
 
 public class Event {
 
   private final Object runner;
-  private final Type type;
+  private final EventType type;
   private final EventData eventData;
   private final long time;
 
-  private Event(final Object runner, final Type type, final EventData eventData) {
+  private Event(final Object runner, final EventType type, final EventData eventData) {
     this.runner = runner;
     this.type = type;
     this.eventData = eventData;
@@ -41,7 +42,7 @@ public class Event {
    * @return New Event instance.
    * @throws NullPointerException if EventData is null.
    */
-  public static Event create(final Object runner, final Type type, final EventData eventData)
+  public static Event create(final Object runner, final EventType type, final EventData eventData)
       throws NullPointerException {
     Preconditions.checkNotNull(eventData, "EventData was null");
     return new Event(runner, type, eventData);
@@ -51,7 +52,7 @@ public class Event {
     return this.runner;
   }
 
-  public Type getType() {
+  public EventType getType() {
     return this.type;
   }
 
@@ -63,14 +64,5 @@ public class Event {
     return this.eventData;
   }
 
-  public enum Type {
-    FLOW_STARTED,
-    FLOW_FINISHED,
-    JOB_STARTED,
-    JOB_FINISHED,
-    JOB_STATUS_CHANGED,
-    EXTERNAL_FLOW_UPDATED,
-    EXTERNAL_JOB_UPDATED
-  }
 
 }
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 0436e3d..c6030b9 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -4,6 +4,7 @@ dependencies {
     compile(project(':azkaban-common'))
 
     compile deps.kafkaLog4jAppender
+    compile deps.gobblinKafka
 
     runtime(project(':azkaban-hadoop-security-plugin'))
 
@@ -13,6 +14,16 @@ dependencies {
     testRuntime deps.h2
 }
 
+configurations.compile {
+    exclude group: 'com.linkedin.gobblin', module: 'gobblin-api'
+    exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-graphite'
+    exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-hadoop'
+    exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-influxdb'
+    exclude group: 'com.linkedin.gobblin', module: 'gobblin-runtime'
+
+    exclude group: 'org.projectlombok', module: 'lombok'
+}
+
 distributions {
     main {
         contents {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index 683ca35..75f84ef 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -17,9 +17,21 @@
 
 package azkaban.execapp;
 
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_CLASS_PARAM;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_ENABLED;
+
+import azkaban.execapp.reporter.AzkabanKafkaAvroEventReporter;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.log4j.Logger;
 
 /**
  * This Guice module is currently a one place container for all bindings in the current module. This
@@ -28,9 +40,47 @@ import com.google.inject.AbstractModule;
  */
 public class AzkabanExecServerModule extends AbstractModule {
 
+  private static final Logger logger = Logger.getLogger(AzkabanExecServerModule.class);
+
   @Override
   protected void configure() {
     install(new ExecJettyServerModule());
     bind(ExecutorLoader.class).to(JdbcExecutorLoader.class);
   }
+
+  @Inject
+  @Provides
+  @Singleton
+  public AzkabanEventReporter createAzkabanEventReporter(final Props props) {
+    final boolean eventReporterEnabled =
+        props.getBoolean(AZKABAN_EVENT_REPORTING_ENABLED, false);
+
+    if (!eventReporterEnabled) {
+      logger.info("Event reporter is not enabled");
+      return null;
+    }
+
+    final Class<?> eventReporterClass =
+        props.getClass(AZKABAN_EVENT_REPORTING_CLASS_PARAM, AzkabanKafkaAvroEventReporter.class);
+    if (eventReporterClass != null && eventReporterClass.getConstructors().length > 0) {
+      this.logger.info("Loading event reporter class " + eventReporterClass.getName());
+      try {
+        final Constructor<?> eventReporterClassConstructor =
+            eventReporterClass.getConstructor(Props.class);
+        return (AzkabanEventReporter) eventReporterClassConstructor.newInstance(props);
+      } catch (final InvocationTargetException e) {
+        this.logger.error(e.getTargetException().getMessage());
+        if (e.getTargetException() instanceof IllegalArgumentException) {
+          throw new IllegalArgumentException(e);
+        } else {
+          throw new RuntimeException(e);
+        }
+      } catch (final Exception e) {
+        this.logger.error("Could not instantiate EventReporter " + eventReporterClass.getName());
+        throw new RuntimeException(e);
+      }
+    }
+    return null;
+  }
+
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
index 30f96ce..44bcc59 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -14,6 +14,7 @@ import azkaban.execapp.jmx.JmxJobCallback;
 import azkaban.execapp.jmx.JmxJobCallbackMBean;
 import azkaban.executor.Status;
 import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import java.net.InetAddress;
@@ -108,9 +109,9 @@ public class JobCallbackManager implements EventListener {
 
     if (event.getRunner() instanceof JobRunner) {
       try {
-        if (event.getType() == Event.Type.JOB_STARTED) {
+        if (event.getType() == EventType.JOB_STARTED) {
           processJobCallOnStart(event);
-        } else if (event.getType() == Event.Type.JOB_FINISHED) {
+        } else if (event.getType() == EventType.JOB_FINISHED) {
           processJobCallOnFinish(event);
         }
       } catch (final Throwable e) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
index 1376249..f90470a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -17,12 +17,12 @@
 package azkaban.execapp.event;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.event.EventListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.ExecutableNode;
+import azkaban.spi.EventType;
 
 public class LocalFlowWatcher extends FlowWatcher {
 
@@ -58,7 +58,7 @@ public class LocalFlowWatcher extends FlowWatcher {
 
     @Override
     public void handleEvent(final Event event) {
-      if (event.getType() == Type.JOB_FINISHED) {
+      if (event.getType() == EventType.JOB_FINISHED) {
         if (event.getRunner() instanceof FlowRunner) {
           // The flow runner will finish a job without it running
           final EventData eventData = event.getData();
@@ -72,7 +72,7 @@ public class LocalFlowWatcher extends FlowWatcher {
           System.out.println(node + " looks like " + node.getStatus());
           handleJobStatusChange(node.getNestedId(), node.getStatus());
         }
-      } else if (event.getType() == Type.FLOW_FINISHED) {
+      } else if (event.getType() == EventType.FLOW_FINISHED) {
         stopWatcher();
       }
     }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 5d5af56..39ef01b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -16,9 +16,10 @@
 
 package azkaban.execapp;
 
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
+
 import azkaban.ServiceProvider;
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.event.EventHandler;
 import azkaban.event.EventListener;
@@ -42,6 +43,8 @@ import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.sla.SlaOption;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.SwapQueue;
@@ -91,10 +94,12 @@ public class FlowRunner extends EventHandler implements Runnable {
   private final Props azkabanProps;
   private final Map<String, Props> sharedProps = new HashMap<>();
   private final JobRunnerEventListener listener = new JobRunnerEventListener();
+  private final FlowRunnerEventListener flowListener = new FlowRunnerEventListener();
   private final Set<JobRunner> activeJobRunners = Collections
       .newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
   // Thread safe swap queue for finishedExecutions.
   private final SwapQueue<ExecutableNode> finishedNodes;
+  private final AzkabanEventReporter azkabanEventReporter;
   private Logger logger;
   private Appender flowAppender;
   private File logFile;
@@ -104,21 +109,16 @@ public class FlowRunner extends EventHandler implements Runnable {
   // Used for pipelining
   private Integer pipelineLevel = null;
   private Integer pipelineExecId = null;
-
   // Watches external flows for execution.
   private FlowWatcher watcher = null;
-
   private Set<String> proxyUsers = null;
   private boolean validateUserProxy;
-
   private String jobLogFileSize = "5MB";
   private int jobLogNumFiles = 4;
-
   private boolean flowPaused = false;
   private boolean flowFailed = false;
   private boolean flowFinished = false;
   private boolean flowKilled = false;
-
   // The following is state that will trigger a retry of all failed jobs
   private boolean retryFailedJobs = false;
 
@@ -127,9 +127,10 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
-      final Props azkabanProps)
+      final Props azkabanProps, final AzkabanEventReporter azkabanEventReporter)
       throws ExecutorManagerException {
-    this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps);
+    this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps,
+        azkabanEventReporter);
   }
 
   /**
@@ -137,7 +138,8 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
-      final ExecutorService executorService, final Props azkabanProps)
+      final ExecutorService executorService, final Props azkabanProps,
+      final AzkabanEventReporter azkabanEventReporter)
       throws ExecutorManagerException {
     this.execId = flow.getExecutionId();
     this.flow = flow;
@@ -154,10 +156,15 @@ public class FlowRunner extends EventHandler implements Runnable {
     this.executorService = executorService;
     this.finishedNodes = new SwapQueue<>();
     this.azkabanProps = azkabanProps;
+    // Add the flow listener only if a non-null eventReporter is available.
+    if (azkabanEventReporter != null) {
+      this.addListener(this.flowListener);
+    }
 
     // Create logger and execution dir in flowRunner initialization instead of flow runtime to avoid NPE
     // where the uninitialized logger is used in flow preparing state
     createLogger(this.flow.getFlowId());
+    this.azkabanEventReporter = azkabanEventReporter;
   }
 
   public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
@@ -201,7 +208,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       loadAllProperties();
 
       this.fireEventListeners(
-          Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow())));
+          Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow())));
       runFlow();
     } catch (final Throwable t) {
       if (this.logger != null) {
@@ -226,7 +233,8 @@ public class FlowRunner extends EventHandler implements Runnable {
         closeLogger();
         updateFlow();
       } finally {
-        this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(this.flow)));
+        this.fireEventListeners(
+            Event.create(this, EventType.FLOW_FINISHED, new EventData(this.flow)));
       }
     }
   }
@@ -288,7 +296,6 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
-
   /**
    * setup logger and execution dir for the flowId
    */
@@ -559,7 +566,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   private void finishExecutableNode(final ExecutableNode node) {
     this.finishedNodes.add(node);
     final EventData eventData = new EventData(node.getStatus(), node.getNestedId());
-    fireEventListeners(Event.create(this, Type.JOB_FINISHED, eventData));
+    fireEventListeners(Event.create(this, EventType.JOB_FINISHED, eventData));
   }
 
   private void finalizeFlow(final ExecutableFlowBase flow) {
@@ -1099,20 +1106,79 @@ public class FlowRunner extends EventHandler implements Runnable {
     return ImmutableSet.copyOf(this.activeJobRunners);
   }
 
+  // Class helps report the flow start and stop events.
+  private class FlowRunnerEventListener implements EventListener {
+
+    public FlowRunnerEventListener() {
+    }
+
+    private synchronized Map<String, String> getFlowMetadata(final FlowRunner flowRunner) {
+      final ExecutableFlow flow = flowRunner.getExecutableFlow();
+      final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
+      final Map<String, String> metaData = new HashMap<>();
+      metaData.put("flowName", flow.getId());
+      metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
+      metaData.put("projectName", flow.getProjectName());
+      metaData.put("submitUser", flow.getSubmitUser());
+      metaData.put("executionId", String.valueOf(flow.getExecutionId()));
+      metaData.put("startTime", String.valueOf(flow.getStartTime()));
+      metaData.put("submitTime", String.valueOf(flow.getSubmitTime()));
+      return metaData;
+    }
+
+    @Override
+    public synchronized void handleEvent(final Event event) {
+      if (event.getType() == EventType.FLOW_STARTED) {
+        final FlowRunner flowRunner = (FlowRunner) event.getRunner();
+        final ExecutableFlow flow = flowRunner.getExecutableFlow();
+        FlowRunner.this.logger.info("Flow started: " + flow.getId());
+        FlowRunner.this.azkabanEventReporter.report(event.getType(), getFlowMetadata(flowRunner));
+      } else if (event.getType() == EventType.FLOW_FINISHED) {
+        final FlowRunner flowRunner = (FlowRunner) event.getRunner();
+        final ExecutableFlow flow = flowRunner.getExecutableFlow();
+        FlowRunner.this.logger.info("Flow ended: " + flow.getId());
+        final Map<String, String> flowMetadata = getFlowMetadata(flowRunner);
+        flowMetadata.put("endTime", String.valueOf(flow.getEndTime()));
+        flowMetadata.put("flowStatus", flow.getStatus().name());
+        FlowRunner.this.azkabanEventReporter.report(event.getType(), flowMetadata);
+      }
+    }
+  }
+
   private class JobRunnerEventListener implements EventListener {
 
     public JobRunnerEventListener() {
     }
 
+    private synchronized Map<String, String> getJobMetadata(final JobRunner jobRunner) {
+      final ExecutableNode node = jobRunner.getNode();
+      final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
+      final Map<String, String> metaData = new HashMap<>();
+      metaData.put("jobId", node.getId());
+      metaData.put("executionID", String.valueOf(node.getExecutableFlow().getExecutionId()));
+      metaData.put("flowName", node.getExecutableFlow().getId());
+      metaData.put("startTime", String.valueOf(node.getStartTime()));
+      metaData.put("jobType", String.valueOf(node.getType()));
+      metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
+      metaData.put("jobProxyUser",
+          jobRunner.getProps().getString("user.to.proxy", null));
+      return metaData;
+    }
+
     @Override
     public synchronized void handleEvent(final Event event) {
-
-      if (event.getType() == Type.JOB_STATUS_CHANGED) {
+      if (event.getType() == EventType.JOB_STATUS_CHANGED) {
         updateFlow();
-      } else if (event.getType() == Type.JOB_FINISHED) {
-        final JobRunner runner = (JobRunner) event.getRunner();
-        final ExecutableNode node = runner.getNode();
+      } else if (event.getType() == EventType.JOB_FINISHED) {
         final EventData eventData = event.getData();
+        final JobRunner jobRunner = (JobRunner) event.getRunner();
+        final ExecutableNode node = jobRunner.getNode();
+        if (FlowRunner.this.azkabanEventReporter != null) {
+          final Map<String, String> jobMetadata = getJobMetadata(jobRunner);
+          jobMetadata.put("jobStatus", node.getStatus().name());
+          jobMetadata.put("endTime", String.valueOf(node.getEndTime()));
+          FlowRunner.this.azkabanEventReporter.report(event.getType(), jobMetadata);
+        }
         final long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
         synchronized (FlowRunner.this.mainSyncObj) {
           FlowRunner.this.logger.info("Job " + eventData.getNestedId() + " finished with status "
@@ -1127,12 +1193,18 @@ public class FlowRunner extends EventHandler implements Runnable {
           }
 
           FlowRunner.this.finishedNodes.add(node);
-          FlowRunner.this.activeJobRunners.remove(runner);
+          FlowRunner.this.activeJobRunners.remove(jobRunner);
           node.getParentFlow().setUpdateTime(System.currentTimeMillis());
           interrupt();
           fireEventListeners(event);
         }
-      } else if (event.getType() == Type.JOB_STARTED) {
+      } else if (event.getType() == EventType.JOB_STARTED) {
+        final EventData eventData = event.getData();
+        FlowRunner.this.logger.info("Job Started: " + eventData.getNestedId());
+        if (FlowRunner.this.azkabanEventReporter != null) {
+          final JobRunner jobRunner = (JobRunner) event.getRunner();
+          FlowRunner.this.azkabanEventReporter.report(event.getType(), getJobMetadata(jobRunner));
+        }
         // add job level checker
         final TriggerManager triggerManager = ServiceProvider.SERVICE_PROVIDER
             .getInstance(TriggerManager.class);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index b972c7d..c1b5c87 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -35,6 +35,8 @@ import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectWhitelist;
 import azkaban.project.ProjectWhitelist.WhitelistType;
 import azkaban.sla.SlaOption;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
 import azkaban.storage.StorageManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
@@ -61,6 +63,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.io.FileUtils;
@@ -116,7 +119,7 @@ public class FlowRunnerManager implements EventListener,
   private final JobTypeManager jobtypeManager;
   private final FlowPreparer flowPreparer;
   private final TriggerManager triggerManager;
-
+  private final AzkabanEventReporter azkabanEventReporter;
 
   private final Props azkabanProps;
   private final File executionDirectory;
@@ -151,11 +154,13 @@ public class FlowRunnerManager implements EventListener,
       final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader,
       final StorageManager storageManager,
-      final TriggerManager triggerManager) throws IOException {
+      final TriggerManager triggerManager,
+      @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
     this.azkabanProps = props;
 
     this.executionDirRetention = props.getLong("execution.dir.retention",
         this.executionDirRetention);
+    this.azkabanEventReporter = azkabanEventReporter;
     logger.info("Execution dir retention set to " + this.executionDirRetention + " ms");
 
     this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
@@ -358,7 +363,7 @@ public class FlowRunnerManager implements EventListener,
 
     final FlowRunner runner =
         new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager,
-            this.azkabanProps);
+            this.azkabanProps, this.azkabanEventReporter);
     runner.setFlowWatcher(watcher)
         .setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles)
         .setValidateProxyUser(this.validateProxyUser)
@@ -485,16 +490,16 @@ public class FlowRunnerManager implements EventListener,
 
   @Override
   public void handleEvent(final Event event) {
-    if (event.getType() == Event.Type.FLOW_FINISHED || event.getType() == Event.Type.FLOW_STARTED) {
+    if (event.getType() == EventType.FLOW_FINISHED || event.getType() == EventType.FLOW_STARTED) {
       final FlowRunner flowRunner = (FlowRunner) event.getRunner();
       final ExecutableFlow flow = flowRunner.getExecutableFlow();
 
-      if (event.getType() == Event.Type.FLOW_FINISHED) {
+      if (event.getType() == EventType.FLOW_FINISHED) {
         this.recentlyFinishedFlows.put(flow.getExecutionId(), flow);
         logger.info("Flow " + flow.getExecutionId()
             + " is finished. Adding it to recently finished flows list.");
         this.runningFlows.remove(flow.getExecutionId());
-      } else if (event.getType() == Event.Type.FLOW_STARTED) {
+      } else if (event.getType() == EventType.FLOW_STARTED) {
         // add flow level SLA checker
         this.triggerManager
             .addTrigger(flow.getExecutionId(), SlaOption.getFlowLevelSLAOptions(flow));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
index 461c4b1..24e2bef 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -6,6 +6,7 @@ import azkaban.event.EventListener;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.Status;
+import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import java.util.HashMap;
 import java.util.Map;
@@ -108,16 +109,16 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
             + eventData.getStatus());
       }
 
-      if (event.getType() == Event.Type.JOB_STARTED) {
+      if (event.getType() == EventType.JOB_STARTED) {
         this.runningJobCount.incrementAndGet();
-      } else if (event.getType() == Event.Type.JOB_FINISHED) {
+      } else if (event.getType() == EventType.JOB_FINISHED) {
         this.totalExecutedJobCount.incrementAndGet();
         if (this.runningJobCount.intValue() > 0) {
           this.runningJobCount.decrementAndGet();
         } else {
           logger.warn("runningJobCount not messed up, it is already zero "
               + "and we are trying to decrement on job event "
-              + Event.Type.JOB_FINISHED);
+              + EventType.JOB_FINISHED);
         }
 
         if (eventData.getStatus() == Status.FAILED) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 85f5a4c..84fff8d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -18,7 +18,6 @@ package azkaban.execapp;
 
 import azkaban.Constants;
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.event.EventHandler;
 import azkaban.execapp.event.BlockingStatus;
@@ -34,6 +33,7 @@ import azkaban.jobExecutor.JavaProcessJob;
 import azkaban.jobExecutor.Job;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
+import azkaban.spi.EventType;
 import azkaban.utils.ExternalLinkUtils;
 import azkaban.utils.PatternLayoutEscaped;
 import azkaban.utils.Props;
@@ -412,12 +412,12 @@ public class JobRunner extends EventHandler implements Runnable {
       if (quickFinish) {
         this.node.setStartTime(time);
         fireEvent(
-            Event.create(this, Type.JOB_STARTED,
+            Event.create(this, EventType.JOB_STARTED,
                 new EventData(nodeStatus, this.node.getNestedId())));
         this.node.setEndTime(time);
         fireEvent(
             Event
-                .create(this, Type.JOB_FINISHED,
+                .create(this, EventType.JOB_FINISHED,
                     new EventData(nodeStatus, this.node.getNestedId())));
         return true;
       }
@@ -580,13 +580,13 @@ public class JobRunner extends EventHandler implements Runnable {
     Status finalStatus = this.node.getStatus();
     uploadExecutableNode();
     if (!errorFound && !isKilled()) {
-      fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(this.node)));
+      fireEvent(Event.create(this, EventType.JOB_STARTED, new EventData(this.node)));
 
       final Status prepareStatus = prepareJob();
       if (prepareStatus != null) {
         // Writes status to the db
         writeStatus();
-        fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED,
+        fireEvent(Event.create(this, EventType.JOB_STATUS_CHANGED,
             new EventData(prepareStatus, this.node.getNestedId())));
         finalStatus = runJob();
       } else {
@@ -609,7 +609,7 @@ public class JobRunner extends EventHandler implements Runnable {
         "Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
             + " with status " + this.node.getStatus());
 
-    fireEvent(Event.create(this, Type.JOB_FINISHED,
+    fireEvent(Event.create(this, EventType.JOB_FINISHED,
         new EventData(finalStatus, this.node.getNestedId())), false);
     finalizeLogFile(this.node.getAttempt());
     finalizeAttachmentFile();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
index 6116583..b48b85f 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -17,13 +17,13 @@
 package azkaban.execapp.metric;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.executor.Status;
 import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.spi.EventType;
 
 /**
  * Metric to keep track of number of failed flows in between the tracking events
@@ -47,7 +47,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
    */
   @Override
   public synchronized void handleEvent(final Event event) {
-    if (event.getType() == Type.FLOW_FINISHED) {
+    if (event.getType() == EventType.FLOW_FINISHED) {
       final FlowRunner runner = (FlowRunner) event.getRunner();
       if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
         this.value = this.value + 1;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index b069b0b..7ad0a46 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -17,12 +17,12 @@
 package azkaban.execapp.metric;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.executor.Status;
 import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.spi.EventType;
 
 /**
  * Metric to keep track of number of failed jobs in between the tracking events
@@ -45,7 +45,8 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
    */
   @Override
   public synchronized void handleEvent(final Event event) {
-    if (event.getType() == Type.JOB_FINISHED && Status.FAILED.equals(event.getData().getStatus())) {
+    if (event.getType() == EventType.JOB_FINISHED && Status.FAILED
+        .equals(event.getData().getStatus())) {
       this.value = this.value + 1;
     }
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index 5be6b68..a6ac7ee 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -17,11 +17,11 @@
 package azkaban.execapp.metric;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.spi.EventType;
 
 /**
  * Metric to keep track of number of running jobs in Azkaban exec server
@@ -49,9 +49,9 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
    */
   @Override
   public synchronized void handleEvent(final Event event) {
-    if (event.getType() == Type.JOB_STARTED) {
+    if (event.getType() == EventType.JOB_STARTED) {
       this.value = this.value + 1;
-    } else if (event.getType() == Type.JOB_FINISHED) {
+    } else if (event.getType() == EventType.JOB_FINISHED) {
       this.value = this.value - 1;
     }
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaAvroEventReporter.java b/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaAvroEventReporter.java
new file mode 100644
index 0000000..fa1040e
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaAvroEventReporter.java
@@ -0,0 +1,124 @@
+package azkaban.execapp.reporter;
+
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_KAFKA_BROKERS;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_KAFKA_TOPIC;
+
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
+import azkaban.utils.Props;
+import com.google.common.base.Preconditions;
+import gobblin.metrics.MetricContext;
+import gobblin.metrics.event.EventSubmitter;
+import gobblin.metrics.kafka.KafkaAvroEventReporter;
+import gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import gobblin.metrics.kafka.KafkaEventReporter;
+import gobblin.metrics.kafka.KafkaPusher;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Default implementation of the <code>AzkabanEventReporter</code> class.
+ * An instance of this class is injected via guice into the <code>FlowRunnerManager</code>
+ * class when event reporting is enabled.
+ */
+public class AzkabanKafkaAvroEventReporter implements AzkabanEventReporter {
+
+  private static final Logger logger = Logger.getLogger(AzkabanKafkaAvroEventReporter.class);
+  private static final String ROOT_CONTEXT_NAME = "AzkabanKafkaEvents";
+  private static final String AZKABAN_FLOW_EVENTS_NAMESPACE = "AzkabanFlowEvents";
+  private static final String AZKABAN_JOB_EVENTS_NAMESPACE = "AzkabanJobEvents";
+  private final Properties kafkaProps = new Properties();
+  private MetricContext rootContext;
+  private KafkaAvroEventReporter kafkaAvroEventReporter;
+
+  // For testing only.
+  public AzkabanKafkaAvroEventReporter(final KafkaAvroEventReporter kafkaAvroEventReporter,
+      final Props props) {
+    this.kafkaAvroEventReporter = kafkaAvroEventReporter;
+    initKafkaProperties(props);
+  }
+
+  // Constructed via guice provider
+  public AzkabanKafkaAvroEventReporter(final Props props) {
+
+    initKafkaProperties(props);
+
+    final String eventReportingKafkaBrokers =
+        props.get(AZKABAN_EVENT_REPORTING_KAFKA_BROKERS);
+    final String eventReportingKafkaTopic =
+        props.get(AZKABAN_EVENT_REPORTING_KAFKA_TOPIC);
+
+    this.rootContext = MetricContext.builder(ROOT_CONTEXT_NAME).build();
+    final KafkaEventReporter.Builder<? extends KafkaAvroEventReporter.Builder> builder =
+        KafkaAvroEventReporter.Factory.forContext(this.rootContext);
+    final KafkaPusher kafkaPusher =
+        new AzkabanKafkaPusher(eventReportingKafkaBrokers, eventReportingKafkaTopic);
+
+    try {
+      this.kafkaAvroEventReporter = builder.withKafkaPusher(kafkaPusher)
+          .withSchemaRegistry(new KafkaAvroSchemaRegistry(this.kafkaProps))
+          .build(eventReportingKafkaBrokers, eventReportingKafkaTopic);
+      logger.info("Initialized a new kafkaAvroEventReporter");
+    } catch (final Exception e) {
+      logger.error("Exception while initializing kafka reporter", e);
+    }
+  }
+
+  private void initKafkaProperties(final Props props) {
+    Preconditions.checkArgument(props.containsKey(AZKABAN_EVENT_REPORTING_KAFKA_BROKERS),
+        String.format("Property %s not provided.", AZKABAN_EVENT_REPORTING_KAFKA_BROKERS));
+    Preconditions.checkArgument(props.containsKey(AZKABAN_EVENT_REPORTING_KAFKA_TOPIC),
+        String.format("Property %s not provided.", AZKABAN_EVENT_REPORTING_KAFKA_TOPIC));
+    Preconditions
+        .checkArgument(props.containsKey(AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL),
+            String.format("Property %s not provided.",
+                AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL));
+    this.kafkaProps.put("kafka.schema.registry.url",
+        props.get(AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL));
+  }
+
+  /**
+   * @param eventType - The event type to be reported via kafka.
+   * @param metaData - Additional data to accompany the event.
+   */
+  @Override
+  public boolean report(final EventType eventType, final Map<String, String> metaData) {
+    if (this.kafkaAvroEventReporter != null) {
+      switch (eventType) {
+        case FLOW_STARTED:
+          getEventSubmitter(AZKABAN_FLOW_EVENTS_NAMESPACE).submit("flowStarted", metaData);
+          break;
+        case FLOW_FINISHED:
+          getEventSubmitter(AZKABAN_FLOW_EVENTS_NAMESPACE).submit("flowFinished", metaData);
+          break;
+        case JOB_STARTED:
+          getEventSubmitter(AZKABAN_JOB_EVENTS_NAMESPACE).submit("jobStarted", metaData);
+          break;
+        case JOB_FINISHED:
+          getEventSubmitter(AZKABAN_JOB_EVENTS_NAMESPACE).submit("jobFinished", metaData);
+          break;
+        default:
+          logger.warn("Failed to report the event. Unrecognized event type "
+              + eventType.name());
+          return false;
+      }
+      logger.info("Sent " + eventType.name() + " event via kafka");
+      this.kafkaAvroEventReporter.report();
+    } else {
+      logger.warn("Kafka reporter isn't initialized. Failed to report the event");
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * @param namespace - The event namespace
+   * @return - Returns an <code>EventSubmitter</code>
+   */
+  private EventSubmitter getEventSubmitter(final String namespace) {
+    return new EventSubmitter.Builder(this.rootContext, namespace).build();
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaPusher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaPusher.java
new file mode 100644
index 0000000..11537d2
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaPusher.java
@@ -0,0 +1,30 @@
+package azkaban.execapp.reporter;
+
+import com.google.common.io.Closer;
+import gobblin.metrics.kafka.KafkaPusher;
+import gobblin.metrics.kafka.ProducerCloseable;
+import java.util.Properties;
+import kafka.producer.ProducerConfig;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Extends the KafkaPusher class to create an async type kafka producer.
+ */
+public class AzkabanKafkaPusher extends KafkaPusher {
+
+  private static final Logger logger = Logger.getLogger(AzkabanKafkaPusher.class.getName());
+
+  AzkabanKafkaPusher(final String brokers, final String topic) {
+    super(brokers, topic);
+  }
+
+  @Override
+  public ProducerCloseable<String, byte[]> createProducer(final ProducerConfig config) {
+    final Properties props = config.props().props();
+    props.put("producer.type", "async");
+    final ProducerConfig newConfig = new ProducerConfig(props);
+    logger.info("Kafka producer type is set to " + newConfig.producerType());
+    return Closer.create().register(new ProducerCloseable<String, byte[]>(newConfig));
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java
new file mode 100644
index 0000000..df48c28
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java
@@ -0,0 +1,111 @@
+package azkaban.execapp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
+import azkaban.utils.Props;
+import java.util.Map;
+import org.junit.Test;
+
+public class AzkabanExecServerModuleTest {
+
+  /**
+   * Verify that alternate implementation of the <code>AzkabanEventReporter</code>
+   * is initialized.
+   */
+  @Test
+  public void testCreateAzkabanEventReporter() {
+    final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+    final Props props = new Props();
+    props.put("azkaban.event.reporting.enabled", "true");
+    props.put("azkaban.event.reporting.class",
+        "azkaban.execapp.AzkabanEventReporterTest1");
+    final AzkabanEventReporter azkabanEventReporter = azkabanExecServerModule
+        .createAzkabanEventReporter(props);
+    assertThat(azkabanEventReporter).isNotNull();
+    assertThat(azkabanEventReporter).isInstanceOf(AzkabanEventReporterTest1.class);
+  }
+
+  /**
+   * Verify that <code>IllegalArgumentException</code> is thrown when required properties
+   * are missing.
+   */
+  @Test
+  public void testAzkabanEventReporterInvalidProperties() {
+    final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+    final Props props = new Props();
+    props.put("azkaban.event.reporting.enabled", "true");
+    props.put("azkaban.event.reporting.class",
+        "azkaban.execapp.reporter.AzkabanKafkaAvroEventReporter");
+    assertThatIllegalArgumentException()
+        .isThrownBy(() -> azkabanExecServerModule.createAzkabanEventReporter(props));
+  }
+
+  /**
+   * Verify that a <code>RuntimeException</code> is thrown when valid constructor is
+   * not found in the event reporter implementation.
+   */
+  @Test
+  public void testAzkabanEventReporterInvalidConstructor() {
+    final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+    final Props props = new Props();
+    props.put("azkaban.event.reporting.enabled", "true");
+    props.put("azkaban.event.reporting.class",
+        "azkaban.execapp.AzkabanEventReporterTest3");
+    assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(() -> azkabanExecServerModule.createAzkabanEventReporter(props));
+  }
+
+  /**
+   * Ensures that the event reporter is not initialized when the property 'event.reporter.enabled'
+   * is not set.
+   */
+  @Test
+  public void testEventReporterDisabled() {
+    final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+    final AzkabanEventReporter azkabanEventReporter = azkabanExecServerModule
+        .createAzkabanEventReporter(new Props());
+    assertThat(azkabanEventReporter).isNull();
+  }
+
+}
+
+// Dummy implementation of the AzkabanEventReporter interface
+// with valid constructor.
+class AzkabanEventReporterTest1 implements AzkabanEventReporter {
+
+  public AzkabanEventReporterTest1(final Props props) {
+
+  }
+
+  @Override
+  public boolean report(final EventType eventType, final Map<String, String> metadata) {
+    return false;
+  }
+}
+
+// Dummy implementation of the AzkabanEventReporter interface, for test.
+// Valid constructor is not available.
+class AzkabanEventReporterTest2 implements AzkabanEventReporter {
+
+  @Override
+  public boolean report(final EventType eventType, final Map<String, String> metadata) {
+    return false;
+  }
+}
+
+// Dummy implementation of the AzkabanEventReporter with an invalid constructor
+class AzkabanEventReporterTest3 implements AzkabanEventReporter {
+
+  public AzkabanEventReporterTest3() {
+
+  }
+
+  @Override
+  public boolean report(final EventType eventType, final Map<String, String> metadata) {
+    return false;
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index 681acd5..f585ea6 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -19,6 +19,7 @@ package azkaban.execapp.event;
 import static org.mockito.Mockito.mock;
 
 import azkaban.execapp.EventCollectorListener;
+import azkaban.execapp.EventReporterUtil;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.FlowRunnerTestUtil;
 import azkaban.executor.ExecutableFlow;
@@ -30,6 +31,7 @@ import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -42,6 +44,8 @@ import org.junit.Test;
 
 public class LocalFlowWatcherTest {
 
+  private final AzkabanEventReporter azkabanEventReporter =
+      EventReporterUtil.getTestAzkabanEventReporter();
   private JobTypeManager jobtypeManager;
   private int dirVal = 0;
 
@@ -160,14 +164,14 @@ public class LocalFlowWatcherTest {
 
   private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
 
       System.out.println("Node " + node.getId() + " start: "
           + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -194,14 +198,14 @@ public class LocalFlowWatcherTest {
 
   private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
 
       long minDiff = Long.MAX_VALUE;
       for (final String watchedChild : watchedNode.getOutNodes()) {
@@ -209,7 +213,7 @@ public class LocalFlowWatcherTest {
         if (child == null) {
           continue;
         }
-        Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
+        Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
         final long diff = node.getStartTime() - child.getEndTime();
         minDiff = Math.min(minDiff, diff);
         System.out.println("Node " + node.getId() + " start: "
@@ -252,7 +256,7 @@ public class LocalFlowWatcherTest {
     }
     loader.uploadExecutableFlow(exFlow);
     final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
-        this.jobtypeManager, azkabanProps);
+        this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
     runner.setFlowWatcher(watcher);
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index f5b195a..b7cd06f 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -19,6 +19,7 @@ package azkaban.execapp.event;
 import static org.mockito.Mockito.mock;
 
 import azkaban.execapp.EventCollectorListener;
+import azkaban.execapp.EventReporterUtil;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.FlowRunnerTestUtil;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -32,6 +33,7 @@ import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -46,6 +48,8 @@ import org.junit.rules.TemporaryFolder;
 
 public class RemoteFlowWatcherTest {
 
+  private final AzkabanEventReporter azkabanEventReporter =
+      EventReporterUtil.getTestAzkabanEventReporter();
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private JobTypeManager jobtypeManager;
@@ -148,14 +152,14 @@ public class RemoteFlowWatcherTest {
 
   private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
 
       System.out.println("Node " + node.getId() + " start: "
           + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -180,14 +184,14 @@ public class RemoteFlowWatcherTest {
 
   private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
 
       long minDiff = Long.MAX_VALUE;
       for (final String watchedChild : watchedNode.getOutNodes()) {
@@ -195,7 +199,7 @@ public class RemoteFlowWatcherTest {
         if (child == null) {
           continue;
         }
-        Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
+        Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
         final long diff = node.getStartTime() - child.getEndTime();
         minDiff = Math.min(minDiff, diff);
         Assert.assertTrue(
@@ -238,7 +242,7 @@ public class RemoteFlowWatcherTest {
 
     loader.uploadExecutableFlow(exFlow);
     final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
-        this.jobtypeManager, azkabanProps);
+        this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
     runner.setFlowWatcher(watcher);
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
index adf17e0..28fb57a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
@@ -19,8 +19,8 @@ package azkaban.execapp;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
+import azkaban.spi.EventType;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -31,9 +31,9 @@ public class EventCollectorListener implements EventListener {
   public static final Object handleEvent = new Object();
   // CopyOnWriteArrayList allows concurrent iteration and modification
   private final List<Event> eventList = new CopyOnWriteArrayList<>();
-  private final HashSet<Event.Type> filterOutTypes = new HashSet<>();
+  private final HashSet<EventType> filterOutTypes = new HashSet<>();
 
-  public void setEventFilterOut(final Event.Type... types) {
+  public void setEventFilterOut(final EventType... types) {
     this.filterOutTypes.addAll(Arrays.asList(types));
   }
 
@@ -62,7 +62,7 @@ public class EventCollectorListener implements EventListener {
     return true;
   }
 
-  public void assertEvents(final Type... expected) {
+  public void assertEvents(final EventType... expected) {
     final Object[] captured = this.eventList.stream()
         .filter(event -> event.getRunner() != null)
         .map(event -> event.getType())
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/EventReporterUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/EventReporterUtil.java
new file mode 100644
index 0000000..9a34d8d
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/EventReporterUtil.java
@@ -0,0 +1,37 @@
+package azkaban.execapp;
+
+import static org.mockito.Mockito.mock;
+
+import azkaban.execapp.reporter.AzkabanKafkaAvroEventReporter;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.utils.Props;
+import gobblin.metrics.kafka.KafkaAvroEventReporter;
+import org.mockito.Mockito;
+
+public final class EventReporterUtil {
+  private EventReporterUtil() {
+
+  }
+
+  /**
+   *
+   * @return - Returns a mock <code>AzkabanEventReporter</code> instance.
+   */
+  public static AzkabanEventReporter getTestAzkabanEventReporter() {
+    final KafkaAvroEventReporter kafkaAvroEventReporter = mock(KafkaAvroEventReporter.class);
+    Mockito.doNothing().when(kafkaAvroEventReporter).report();
+    return new AzkabanKafkaAvroEventReporter(kafkaAvroEventReporter, getTestKafkaProps());
+  }
+
+  /**
+   * @return - Returns required kafka properties for test methods.
+   */
+  public static Props getTestKafkaProps() {
+    final Props kafkaProps = new Props();
+    kafkaProps.put("azkaban.event.reporting.kafka.brokers", "brokers.com:10950");
+    kafkaProps.put("azkaban.event.reporting.kafka.topic", "KafkaTopic");
+    kafkaProps
+        .put("azkaban.event.reporting.kafka.schema.registry.url", "registry.com/schemaRegistry");
+    return kafkaProps;
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 83897d5..21511cc 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -34,6 +34,7 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -61,6 +62,8 @@ public class FlowRunnerPipelineTest {
 
   private static int id = 101;
   private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+  private final AzkabanEventReporter azkabanEventReporter =
+      EventReporterUtil.getTestAzkabanEventReporter();
   private File workingDir;
   private JobTypeManager jobtypeManager;
   private ExecutorLoader fakeExecutorLoader;
@@ -677,8 +680,8 @@ public class FlowRunnerPipelineTest {
 
     final FlowRunner runner =
         new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
-            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
-
+            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
+            this.azkabanEventReporter);
     runner.addListener(eventCollector);
 
     return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 50e0eae..d202193 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -29,6 +29,7 @@ import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -61,6 +62,8 @@ public class FlowRunnerPropertyResolutionTest {
 
   private static int id = 101;
   private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+  private final AzkabanEventReporter azkabanEventReporter =
+      EventReporterUtil.getTestAzkabanEventReporter();
   private File workingDir;
   private JobTypeManager jobtypeManager;
   private ExecutorLoader fakeExecutorLoader;
@@ -215,7 +218,8 @@ public class FlowRunnerPropertyResolutionTest {
 
     final FlowRunner runner =
         new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
-            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
+            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
+            this.azkabanEventReporter);
     return runner;
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index c26a68e..e8ea268 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -20,8 +20,6 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.when;
 
-import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
@@ -33,6 +31,8 @@ import azkaban.jobExecutor.AllJobExecutorTests;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -48,6 +48,8 @@ import org.mockito.MockitoAnnotations;
 public class FlowRunnerTest extends FlowRunnerTestBase {
 
   private static final File TEST_DIR = ExecutionsTestUtil.getFlowDir("exectest1");
+  private final AzkabanEventReporter azkabanEventReporter =
+      EventReporterUtil.getTestAzkabanEventReporter();
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File workingDir;
@@ -76,8 +78,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
   @Test
   public void exec1Normal() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
 
     startThread(this.runner);
@@ -97,14 +99,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job8", Status.SUCCEEDED);
     assertStatus("job10", Status.SUCCEEDED);
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void exec1Disabled() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     final ExecutableFlow exFlow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
 
@@ -137,14 +139,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job8", Status.SUCCEEDED);
     assertStatus("job10", Status.SKIPPED);
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void exec1Failed() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     final ExecutableFlow flow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
 
@@ -168,14 +170,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job10", Status.CANCELLED);
     assertThreadShutDown();
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void exec1FailedKillAll() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     final ExecutableFlow flow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
     flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
@@ -200,14 +202,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job9", Status.CANCELLED);
     assertStatus("job10", Status.CANCELLED);
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void exec1FailedFinishRest() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     final ExecutableFlow flow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
     flow.getExecutionOptions().setFailureAction(
@@ -231,14 +233,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job10", Status.CANCELLED);
     assertThreadShutDown();
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void execAndCancel() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
 
     startThread(this.runner);
@@ -261,14 +263,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
 
     assertFlowStatus(Status.KILLED);
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void execRetries() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
 
     startThread(this.runner);
@@ -340,7 +342,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
 
     loader.uploadExecutableFlow(flow);
     final FlowRunner runner =
-        new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
+        new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
+            this.azkabanEventReporter);
 
     runner.addListener(eventCollector);
 
@@ -361,7 +364,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     loader.uploadExecutableFlow(exFlow);
 
     final FlowRunner runner =
-        new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
+        new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
+            this.azkabanEventReporter);
 
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 30ea40e..974df4c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -35,6 +35,7 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -97,6 +98,8 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
   private static int id = 101;
   private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+  private final AzkabanEventReporter azkabanEventReporter =
+      EventReporterUtil.getTestAzkabanEventReporter();
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File workingDir;
@@ -1129,8 +1132,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
     final FlowRunner runner = new FlowRunner(
         this.fakeExecutorLoader.fetchExecutableFlow(exId), this.fakeExecutorLoader,
-        mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
-
+        mock(ProjectLoader.class), this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
     runner.addListener(eventCollector);
 
     return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 765b141..6a162ed 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -17,7 +17,6 @@
 package azkaban.execapp;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
@@ -28,6 +27,7 @@ import azkaban.executor.SleepJavaJob;
 import azkaban.executor.Status;
 import azkaban.jobExecutor.ProcessJob;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -82,12 +82,12 @@ public class JobRunnerTest {
         createJobRunner(1, "testJob", 1, false, loader, eventCollector);
     final ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
         || runner.getStatus() != Status.FAILED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -102,7 +102,8 @@ public class JobRunnerTest {
 
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+    eventCollector
+        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
   @Ignore
@@ -131,7 +132,8 @@ public class JobRunnerTest {
     Assert.assertTrue(!runner.isKilled());
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+    eventCollector
+        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
   @Test
@@ -162,7 +164,7 @@ public class JobRunnerTest {
 
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
 
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
+    eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
   }
 
   @Test
@@ -193,7 +195,7 @@ public class JobRunnerTest {
     Assert.assertTrue(outputProps == null);
     Assert.assertTrue(runner.getLogFilePath() == null);
     Assert.assertTrue(!runner.isKilled());
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
+    eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
   }
 
   @Ignore
@@ -232,7 +234,8 @@ public class JobRunnerTest {
     Assert.assertTrue(logFile.exists());
     Assert.assertTrue(eventCollector.checkOrdering());
     Assert.assertTrue(runner.isKilled());
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+    eventCollector
+        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
   @Ignore
@@ -246,11 +249,11 @@ public class JobRunnerTest {
     final long startTime = System.currentTimeMillis();
     final ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -267,7 +270,8 @@ public class JobRunnerTest {
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
     Assert.assertTrue(eventCollector.checkOrdering());
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+    eventCollector
+        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
   @Test
@@ -280,7 +284,7 @@ public class JobRunnerTest {
     final long startTime = System.currentTimeMillis();
     final ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     final Thread thread = new Thread(runner);
@@ -292,7 +296,7 @@ public class JobRunnerTest {
     runner.kill();
     StatusTestUtils.waitForStatus(node, Status.KILLED);
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -312,7 +316,7 @@ public class JobRunnerTest {
     Thread.sleep(1000L);
     Assert.assertEquals(2L, loader.getNodeUpdateCount("testJob").longValue());
     Assert.assertEquals(2L, (long) loader.getNodeUpdateCount("testJob"));
-    eventCollector.assertEvents(Type.JOB_FINISHED);
+    eventCollector.assertEvents(EventType.JOB_FINISHED);
   }
 
   private Props createProps(final int sleepSec, final boolean fail) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanEventReporterTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanEventReporterTest.java
new file mode 100644
index 0000000..ecd852d
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanEventReporterTest.java
@@ -0,0 +1,64 @@
+package azkaban.execapp.reporter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import azkaban.event.Event;
+import azkaban.event.EventData;
+import azkaban.execapp.EventReporterUtil;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
+import java.util.HashMap;
+import org.junit.Test;
+
+/**
+ * Tests for the default implementation of <code>AzkabanEventReporter</code> class.
+ */
+public class AzkabanEventReporterTest {
+
+
+  @Test
+  public void testAzkabanKafkaEventReporter() {
+    final AzkabanEventReporter azkabanKafkaAvroEventReporter
+        = EventReporterUtil.getTestAzkabanEventReporter();
+    final EventData eventData = mock(EventData.class);
+
+    final Object runnerObject = new Object();
+    boolean status;
+
+    // test flow started event
+    Event event = Event.create(runnerObject, EventType.FLOW_STARTED, eventData);
+    status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+    assertThat(status).isTrue();
+
+    // test flow finished event
+    event = Event.create(runnerObject, EventType.FLOW_FINISHED, eventData);
+    status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+    assertThat(status).isTrue();
+
+    // test job started event
+    event = Event.create(runnerObject, EventType.JOB_STARTED, eventData);
+    status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+    assertThat(status).isTrue();
+
+    // test job finished event
+    event = Event.create(runnerObject, EventType.JOB_FINISHED, eventData);
+    status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+    assertThat(status).isTrue();
+
+    // test un-supported event type
+    event = Event.create(runnerObject, EventType.JOB_STATUS_CHANGED, eventData);
+    status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+    assertThat(status).isFalse();
+  }
+
+  @Test
+  public void testNullAzkabanKafkaEventReporter() {
+    final AzkabanKafkaAvroEventReporter azkabanKafkaAvroEventReporter
+        = new AzkabanKafkaAvroEventReporter(null, EventReporterUtil.getTestKafkaProps());
+    final Event event = mock(Event.class);
+    final boolean status = azkabanKafkaAvroEventReporter.report(event.getType(), new HashMap<>());
+    assertThat(status).isFalse();
+  }
+
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanKafkaPusherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanKafkaPusherTest.java
new file mode 100644
index 0000000..cc45cc3
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanKafkaPusherTest.java
@@ -0,0 +1,19 @@
+package azkaban.execapp.reporter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class AzkabanKafkaPusherTest {
+
+  /**
+   * Test validates that an async kafka producer is created.
+   */
+  @Test
+  public void testCreateASyncKafkaProducer() {
+    final AzkabanKafkaPusher azkabanKafkaPusher =
+        new AzkabanKafkaPusher("broker.com:10251", "kafka.topic");
+    assertThat(azkabanKafkaPusher).isNotNull();
+  }
+
+}
diff --git a/azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java b/azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java
new file mode 100644
index 0000000..73c46d1
--- /dev/null
+++ b/azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java
@@ -0,0 +1,19 @@
+package azkaban.spi;
+
+import java.util.Map;
+
+/**
+ * Implement this interface to report flow and job events. Event reporter
+ * can be turned on by setting the property {@code AZKABAN_EVENT_REPORTING_ENABLED} to true.
+ *
+ * By default, a KafkaAvroEventReporter is provided. Alternate implementations
+ * can be provided by setting the property {@code AZKABAN_EVENT_REPORTING_CLASS_PARAM}
+ * <br><br>
+ * The constructor will be called with a {@code azkaban.utils.Props} object passed as
+ * the only parameter. If such a constructor doesn't exist, then the AzkabanEventReporter
+ * instantiation will fail.
+ */
+public interface AzkabanEventReporter {
+
+  boolean report(EventType eventType, Map<String, String> metadata);
+}
diff --git a/azkaban-spi/src/main/java/azkaban/spi/EventType.java b/azkaban-spi/src/main/java/azkaban/spi/EventType.java
new file mode 100644
index 0000000..71f04f7
--- /dev/null
+++ b/azkaban-spi/src/main/java/azkaban/spi/EventType.java
@@ -0,0 +1,14 @@
+package azkaban.spi;
+
+/**
+ * Enum class defining the list of supported event types.
+ */
+public enum EventType {
+  FLOW_STARTED,
+  FLOW_FINISHED,
+  JOB_STARTED,
+  JOB_FINISHED,
+  JOB_STATUS_CHANGED,
+  EXTERNAL_FLOW_UPDATED,
+  EXTERNAL_JOB_UPDATED
+}

build.gradle 5(+4 -1)

diff --git a/build.gradle b/build.gradle
index c7f0093..eab8844 100644
--- a/build.gradle
+++ b/build.gradle
@@ -39,6 +39,9 @@ allprojects {
     repositories {
         mavenCentral()
         mavenLocal()
+        maven {
+            url "http://packages.confluent.io/maven/"
+        }
     }
 }
 
@@ -57,6 +60,7 @@ ext.deps = [
         dbcp2               : 'org.apache.commons:commons-dbcp2:2.1.1',
         dbutils             : 'commons-dbutils:commons-dbutils:1.5',
         fileupload          : 'commons-fileupload:commons-fileupload:1.2.1',
+        gobblinKafka        : 'com.linkedin.gobblin:gobblin-kafka-08:0.11.0',
         gson                : 'com.google.code.gson:gson:2.8.1',
         guava               : 'com.google.guava:guava:21.0',
         guice               : 'com.google.inject:guice:4.1.0',
@@ -140,7 +144,6 @@ subprojects {
             compile deps.log4j
             compile deps.guice
             compile deps.slf4j
-
             runtime deps.slf4jLog4j
 
             testCompile deps.assertj