azkaban-aplcache

Event reporting for azkaban events - No default implementation

10/5/2017 12:19:40 PM

Changes

build.gradle 2(+1 -1)

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index b60100f..754aa3a 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -127,6 +127,17 @@ public class Constants {
     public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
     public static final String PROJECT_TEMP_DIR = "project.temp.dir";
 
+    // Event reporting properties
+    public static final String AZKABAN_EVENT_REPORTING_CLASS_PARAM =
+        "azkaban.event.reporting.class";
+    public static final String AZKABAN_EVENT_REPORTING_ENABLED = "azkaban.event.reporting.enabled";
+    public static final String AZKABAN_EVENT_REPORTING_KAFKA_BROKERS =
+        "azkaban.event.reporting.kafka.brokers";
+    public static final String AZKABAN_EVENT_REPORTING_KAFKA_TOPIC =
+        "azkaban.event.reporting.kafka.topic";
+    public static final String AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL =
+        "azkaban.event.reporting.kafka.schema.registry.url";
+
     /*
      * The max number of artifacts retained per project.
      * Accepted Values:
diff --git a/azkaban-common/src/main/java/azkaban/event/Event.java b/azkaban-common/src/main/java/azkaban/event/Event.java
index fc79b4a..832dd99 100644
--- a/azkaban-common/src/main/java/azkaban/event/Event.java
+++ b/azkaban-common/src/main/java/azkaban/event/Event.java
@@ -16,16 +16,17 @@
 
 package azkaban.event;
 
+import azkaban.spi.EventType;
 import com.google.common.base.Preconditions;
 
 public class Event {
 
   private final Object runner;
-  private final Type type;
+  private final EventType type;
   private final EventData eventData;
   private final long time;
 
-  private Event(final Object runner, final Type type, final EventData eventData) {
+  private Event(final Object runner, final EventType type, final EventData eventData) {
     this.runner = runner;
     this.type = type;
     this.eventData = eventData;
@@ -41,7 +42,7 @@ public class Event {
    * @return New Event instance.
    * @throws NullPointerException if EventData is null.
    */
-  public static Event create(final Object runner, final Type type, final EventData eventData)
+  public static Event create(final Object runner, final EventType type, final EventData eventData)
       throws NullPointerException {
     Preconditions.checkNotNull(eventData, "EventData was null");
     return new Event(runner, type, eventData);
@@ -51,7 +52,7 @@ public class Event {
     return this.runner;
   }
 
-  public Type getType() {
+  public EventType getType() {
     return this.type;
   }
 
@@ -63,14 +64,5 @@ public class Event {
     return this.eventData;
   }
 
-  public enum Type {
-    FLOW_STARTED,
-    FLOW_FINISHED,
-    JOB_STARTED,
-    JOB_FINISHED,
-    JOB_STATUS_CHANGED,
-    EXTERNAL_FLOW_UPDATED,
-    EXTERNAL_JOB_UPDATED
-  }
 
 }
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 1d35d4d..8749c19 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -4,6 +4,7 @@ dependencies {
     compile(project(':az-core'))
     compile(project(':azkaban-common'))
 
+    compile deps.jsr305
     compile deps.kafkaLog4jAppender
     runtime(project(':azkaban-hadoop-security-plugin'))
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index 683ca35..9f67448 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -17,9 +17,20 @@
 
 package azkaban.execapp;
 
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_CLASS_PARAM;
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_ENABLED;
+
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.log4j.Logger;
 
 /**
  * This Guice module is currently a one place container for all bindings in the current module. This
@@ -28,9 +39,47 @@ import com.google.inject.AbstractModule;
  */
 public class AzkabanExecServerModule extends AbstractModule {
 
+  private static final Logger logger = Logger.getLogger(AzkabanExecServerModule.class);
+
   @Override
   protected void configure() {
     install(new ExecJettyServerModule());
     bind(ExecutorLoader.class).to(JdbcExecutorLoader.class);
   }
+
+  @Inject
+  @Provides
+  @Singleton
+  public AzkabanEventReporter createAzkabanEventReporter(final Props props) {
+    final boolean eventReporterEnabled =
+        props.getBoolean(AZKABAN_EVENT_REPORTING_ENABLED, false);
+
+    if (!eventReporterEnabled) {
+      logger.info("Event reporter is not enabled");
+      return null;
+    }
+
+    final Class<?> eventReporterClass =
+        props.getClass(AZKABAN_EVENT_REPORTING_CLASS_PARAM, null);
+    if (eventReporterClass != null && eventReporterClass.getConstructors().length > 0) {
+      this.logger.info("Loading event reporter class " + eventReporterClass.getName());
+      try {
+        final Constructor<?> eventReporterClassConstructor =
+            eventReporterClass.getConstructor(Props.class);
+        return (AzkabanEventReporter) eventReporterClassConstructor.newInstance(props);
+      } catch (final InvocationTargetException e) {
+        this.logger.error(e.getTargetException().getMessage());
+        if (e.getTargetException() instanceof IllegalArgumentException) {
+          throw new IllegalArgumentException(e);
+        } else {
+          throw new RuntimeException(e);
+        }
+      } catch (final Exception e) {
+        this.logger.error("Could not instantiate EventReporter " + eventReporterClass.getName());
+        throw new RuntimeException(e);
+      }
+    }
+    return null;
+  }
+
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
index 30f96ce..44bcc59 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -14,6 +14,7 @@ import azkaban.execapp.jmx.JmxJobCallback;
 import azkaban.execapp.jmx.JmxJobCallbackMBean;
 import azkaban.executor.Status;
 import azkaban.jobcallback.JobCallbackStatusEnum;
+import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import java.net.InetAddress;
@@ -108,9 +109,9 @@ public class JobCallbackManager implements EventListener {
 
     if (event.getRunner() instanceof JobRunner) {
       try {
-        if (event.getType() == Event.Type.JOB_STARTED) {
+        if (event.getType() == EventType.JOB_STARTED) {
           processJobCallOnStart(event);
-        } else if (event.getType() == Event.Type.JOB_FINISHED) {
+        } else if (event.getType() == EventType.JOB_FINISHED) {
           processJobCallOnFinish(event);
         }
       } catch (final Throwable e) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
index 1376249..f90470a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -17,12 +17,12 @@
 package azkaban.execapp.event;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.event.EventListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.ExecutableNode;
+import azkaban.spi.EventType;
 
 public class LocalFlowWatcher extends FlowWatcher {
 
@@ -58,7 +58,7 @@ public class LocalFlowWatcher extends FlowWatcher {
 
     @Override
     public void handleEvent(final Event event) {
-      if (event.getType() == Type.JOB_FINISHED) {
+      if (event.getType() == EventType.JOB_FINISHED) {
         if (event.getRunner() instanceof FlowRunner) {
           // The flow runner will finish a job without it running
           final EventData eventData = event.getData();
@@ -72,7 +72,7 @@ public class LocalFlowWatcher extends FlowWatcher {
           System.out.println(node + " looks like " + node.getStatus());
           handleJobStatusChange(node.getNestedId(), node.getStatus());
         }
-      } else if (event.getType() == Type.FLOW_FINISHED) {
+      } else if (event.getType() == EventType.FLOW_FINISHED) {
         stopWatcher();
       }
     }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 4bb2c3d..9020f30 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -16,9 +16,10 @@
 
 package azkaban.execapp;
 
+import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
+
 import azkaban.ServiceProvider;
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.event.EventHandler;
 import azkaban.event.EventListener;
@@ -43,6 +44,8 @@ import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.sla.SlaOption;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import azkaban.utils.SwapQueue;
 import com.google.common.collect.ImmutableSet;
@@ -91,10 +94,12 @@ public class FlowRunner extends EventHandler implements Runnable {
   private final Props azkabanProps;
   private final Map<String, Props> sharedProps = new HashMap<>();
   private final JobRunnerEventListener listener = new JobRunnerEventListener();
+  private final FlowRunnerEventListener flowListener = new FlowRunnerEventListener();
   private final Set<JobRunner> activeJobRunners = Collections
       .newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
   // Thread safe swap queue for finishedExecutions.
   private final SwapQueue<ExecutableNode> finishedNodes;
+  private final AzkabanEventReporter azkabanEventReporter;
   private Logger logger;
   private Appender flowAppender;
   private File logFile;
@@ -104,13 +109,10 @@ public class FlowRunner extends EventHandler implements Runnable {
   // Used for pipelining
   private Integer pipelineLevel = null;
   private Integer pipelineExecId = null;
-
   // Watches external flows for execution.
   private FlowWatcher watcher = null;
-
   private Set<String> proxyUsers = null;
   private boolean validateUserProxy;
-
   private String jobLogFileSize = "5MB";
   private int jobLogNumFiles = 4;
   private volatile boolean flowPaused = false;
@@ -126,9 +128,10 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
-      final Props azkabanProps)
+      final Props azkabanProps, final AzkabanEventReporter azkabanEventReporter)
       throws ExecutorManagerException {
-    this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps);
+    this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps,
+        azkabanEventReporter);
   }
 
   /**
@@ -136,7 +139,8 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
-      final ExecutorService executorService, final Props azkabanProps)
+      final ExecutorService executorService, final Props azkabanProps,
+      final AzkabanEventReporter azkabanEventReporter)
       throws ExecutorManagerException {
     this.execId = flow.getExecutionId();
     this.flow = flow;
@@ -153,10 +157,15 @@ public class FlowRunner extends EventHandler implements Runnable {
     this.executorService = executorService;
     this.finishedNodes = new SwapQueue<>();
     this.azkabanProps = azkabanProps;
+    // Add the flow listener only if a non-null eventReporter is available.
+    if (azkabanEventReporter != null) {
+      this.addListener(this.flowListener);
+    }
 
     // Create logger and execution dir in flowRunner initialization instead of flow runtime to avoid NPE
     // where the uninitialized logger is used in flow preparing state
     createLogger(this.flow.getFlowId());
+    this.azkabanEventReporter = azkabanEventReporter;
   }
 
   public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
@@ -200,7 +209,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       loadAllProperties();
 
       this.fireEventListeners(
-          Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow())));
+          Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow())));
       runFlow();
     } catch (final Throwable t) {
       if (this.logger != null) {
@@ -225,7 +234,8 @@ public class FlowRunner extends EventHandler implements Runnable {
         closeLogger();
         updateFlow();
       } finally {
-        this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(this.flow)));
+        this.fireEventListeners(
+            Event.create(this, EventType.FLOW_FINISHED, new EventData(this.flow)));
       }
     }
   }
@@ -287,7 +297,6 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
-
   /**
    * setup logger and execution dir for the flowId
    */
@@ -563,7 +572,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   private void finishExecutableNode(final ExecutableNode node) {
     this.finishedNodes.add(node);
     final EventData eventData = new EventData(node.getStatus(), node.getNestedId());
-    fireEventListeners(Event.create(this, Type.JOB_FINISHED, eventData));
+    fireEventListeners(Event.create(this, EventType.JOB_FINISHED, eventData));
   }
 
   private void finalizeFlow(final ExecutableFlowBase flow) {
@@ -1111,20 +1120,79 @@ public class FlowRunner extends EventHandler implements Runnable {
     return ImmutableSet.copyOf(this.activeJobRunners);
   }
 
+  // Class helps report the flow start and stop events.
+  private class FlowRunnerEventListener implements EventListener {
+
+    public FlowRunnerEventListener() {
+    }
+
+    private synchronized Map<String, String> getFlowMetadata(final FlowRunner flowRunner) {
+      final ExecutableFlow flow = flowRunner.getExecutableFlow();
+      final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
+      final Map<String, String> metaData = new HashMap<>();
+      metaData.put("flowName", flow.getId());
+      metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
+      metaData.put("projectName", flow.getProjectName());
+      metaData.put("submitUser", flow.getSubmitUser());
+      metaData.put("executionId", String.valueOf(flow.getExecutionId()));
+      metaData.put("startTime", String.valueOf(flow.getStartTime()));
+      metaData.put("submitTime", String.valueOf(flow.getSubmitTime()));
+      return metaData;
+    }
+
+    @Override
+    public synchronized void handleEvent(final Event event) {
+      if (event.getType() == EventType.FLOW_STARTED) {
+        final FlowRunner flowRunner = (FlowRunner) event.getRunner();
+        final ExecutableFlow flow = flowRunner.getExecutableFlow();
+        FlowRunner.this.logger.info("Flow started: " + flow.getId());
+        FlowRunner.this.azkabanEventReporter.report(event.getType(), getFlowMetadata(flowRunner));
+      } else if (event.getType() == EventType.FLOW_FINISHED) {
+        final FlowRunner flowRunner = (FlowRunner) event.getRunner();
+        final ExecutableFlow flow = flowRunner.getExecutableFlow();
+        FlowRunner.this.logger.info("Flow ended: " + flow.getId());
+        final Map<String, String> flowMetadata = getFlowMetadata(flowRunner);
+        flowMetadata.put("endTime", String.valueOf(flow.getEndTime()));
+        flowMetadata.put("flowStatus", flow.getStatus().name());
+        FlowRunner.this.azkabanEventReporter.report(event.getType(), flowMetadata);
+      }
+    }
+  }
+
   private class JobRunnerEventListener implements EventListener {
 
     public JobRunnerEventListener() {
     }
 
+    private synchronized Map<String, String> getJobMetadata(final JobRunner jobRunner) {
+      final ExecutableNode node = jobRunner.getNode();
+      final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
+      final Map<String, String> metaData = new HashMap<>();
+      metaData.put("jobId", node.getId());
+      metaData.put("executionID", String.valueOf(node.getExecutableFlow().getExecutionId()));
+      metaData.put("flowName", node.getExecutableFlow().getId());
+      metaData.put("startTime", String.valueOf(node.getStartTime()));
+      metaData.put("jobType", String.valueOf(node.getType()));
+      metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
+      metaData.put("jobProxyUser",
+          jobRunner.getProps().getString("user.to.proxy", null));
+      return metaData;
+    }
+
     @Override
     public synchronized void handleEvent(final Event event) {
-
-      if (event.getType() == Type.JOB_STATUS_CHANGED) {
+      if (event.getType() == EventType.JOB_STATUS_CHANGED) {
         updateFlow();
-      } else if (event.getType() == Type.JOB_FINISHED) {
-        final JobRunner runner = (JobRunner) event.getRunner();
-        final ExecutableNode node = runner.getNode();
+      } else if (event.getType() == EventType.JOB_FINISHED) {
         final EventData eventData = event.getData();
+        final JobRunner jobRunner = (JobRunner) event.getRunner();
+        final ExecutableNode node = jobRunner.getNode();
+        if (FlowRunner.this.azkabanEventReporter != null) {
+          final Map<String, String> jobMetadata = getJobMetadata(jobRunner);
+          jobMetadata.put("jobStatus", node.getStatus().name());
+          jobMetadata.put("endTime", String.valueOf(node.getEndTime()));
+          FlowRunner.this.azkabanEventReporter.report(event.getType(), jobMetadata);
+        }
         final long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
         synchronized (FlowRunner.this.mainSyncObj) {
           FlowRunner.this.logger.info("Job " + eventData.getNestedId() + " finished with status "
@@ -1139,12 +1207,18 @@ public class FlowRunner extends EventHandler implements Runnable {
           }
 
           FlowRunner.this.finishedNodes.add(node);
-          FlowRunner.this.activeJobRunners.remove(runner);
+          FlowRunner.this.activeJobRunners.remove(jobRunner);
           node.getParentFlow().setUpdateTime(System.currentTimeMillis());
           interrupt();
           fireEventListeners(event);
         }
-      } else if (event.getType() == Type.JOB_STARTED) {
+      } else if (event.getType() == EventType.JOB_STARTED) {
+        final EventData eventData = event.getData();
+        FlowRunner.this.logger.info("Job Started: " + eventData.getNestedId());
+        if (FlowRunner.this.azkabanEventReporter != null) {
+          final JobRunner jobRunner = (JobRunner) event.getRunner();
+          FlowRunner.this.azkabanEventReporter.report(event.getType(), getJobMetadata(jobRunner));
+        }
         // add job level checker
         final TriggerManager triggerManager = ServiceProvider.SERVICE_PROVIDER
             .getInstance(TriggerManager.class);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 50cdd4b..761e6db 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -35,6 +35,8 @@ import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectWhitelist;
 import azkaban.project.ProjectWhitelist.WhitelistType;
 import azkaban.sla.SlaOption;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
 import azkaban.storage.StorageManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
@@ -61,6 +63,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.io.FileUtils;
@@ -116,7 +119,7 @@ public class FlowRunnerManager implements EventListener,
   private final JobTypeManager jobtypeManager;
   private final FlowPreparer flowPreparer;
   private final TriggerManager triggerManager;
-
+  private final AzkabanEventReporter azkabanEventReporter;
 
   private final Props azkabanProps;
   private final File executionDirectory;
@@ -151,11 +154,13 @@ public class FlowRunnerManager implements EventListener,
       final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader,
       final StorageManager storageManager,
-      final TriggerManager triggerManager) throws IOException {
+      final TriggerManager triggerManager,
+      @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
     this.azkabanProps = props;
 
     this.executionDirRetention = props.getLong("execution.dir.retention",
         this.executionDirRetention);
+    this.azkabanEventReporter = azkabanEventReporter;
     logger.info("Execution dir retention set to " + this.executionDirRetention + " ms");
 
     this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
@@ -360,7 +365,7 @@ public class FlowRunnerManager implements EventListener,
 
     final FlowRunner runner =
         new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager,
-            this.azkabanProps);
+            this.azkabanProps, this.azkabanEventReporter);
     runner.setFlowWatcher(watcher)
         .setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles)
         .setValidateProxyUser(this.validateProxyUser)
@@ -487,16 +492,16 @@ public class FlowRunnerManager implements EventListener,
 
   @Override
   public void handleEvent(final Event event) {
-    if (event.getType() == Event.Type.FLOW_FINISHED || event.getType() == Event.Type.FLOW_STARTED) {
+    if (event.getType() == EventType.FLOW_FINISHED || event.getType() == EventType.FLOW_STARTED) {
       final FlowRunner flowRunner = (FlowRunner) event.getRunner();
       final ExecutableFlow flow = flowRunner.getExecutableFlow();
 
-      if (event.getType() == Event.Type.FLOW_FINISHED) {
+      if (event.getType() == EventType.FLOW_FINISHED) {
         this.recentlyFinishedFlows.put(flow.getExecutionId(), flow);
         logger.info("Flow " + flow.getExecutionId()
             + " is finished. Adding it to recently finished flows list.");
         this.runningFlows.remove(flow.getExecutionId());
-      } else if (event.getType() == Event.Type.FLOW_STARTED) {
+      } else if (event.getType() == EventType.FLOW_STARTED) {
         // add flow level SLA checker
         this.triggerManager
             .addTrigger(flow.getExecutionId(), SlaOption.getFlowLevelSLAOptions(flow));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
index 461c4b1..24e2bef 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -6,6 +6,7 @@ import azkaban.event.EventListener;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.Status;
+import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import java.util.HashMap;
 import java.util.Map;
@@ -108,16 +109,16 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
             + eventData.getStatus());
       }
 
-      if (event.getType() == Event.Type.JOB_STARTED) {
+      if (event.getType() == EventType.JOB_STARTED) {
         this.runningJobCount.incrementAndGet();
-      } else if (event.getType() == Event.Type.JOB_FINISHED) {
+      } else if (event.getType() == EventType.JOB_FINISHED) {
         this.totalExecutedJobCount.incrementAndGet();
         if (this.runningJobCount.intValue() > 0) {
           this.runningJobCount.decrementAndGet();
         } else {
           logger.warn("runningJobCount not messed up, it is already zero "
               + "and we are trying to decrement on job event "
-              + Event.Type.JOB_FINISHED);
+              + EventType.JOB_FINISHED);
         }
 
         if (eventData.getStatus() == Status.FAILED) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index e320258..23fb470 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -18,7 +18,6 @@ package azkaban.execapp;
 
 import azkaban.Constants;
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.event.EventHandler;
 import azkaban.execapp.event.BlockingStatus;
@@ -34,6 +33,7 @@ import azkaban.jobExecutor.JavaProcessJob;
 import azkaban.jobExecutor.Job;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
+import azkaban.spi.EventType;
 import azkaban.utils.ExternalLinkUtils;
 import azkaban.utils.PatternLayoutEscaped;
 import azkaban.utils.Props;
@@ -412,12 +412,12 @@ public class JobRunner extends EventHandler implements Runnable {
       if (quickFinish) {
         this.node.setStartTime(time);
         fireEvent(
-            Event.create(this, Type.JOB_STARTED,
+            Event.create(this, EventType.JOB_STARTED,
                 new EventData(nodeStatus, this.node.getNestedId())));
         this.node.setEndTime(time);
         fireEvent(
             Event
-                .create(this, Type.JOB_FINISHED,
+                .create(this, EventType.JOB_FINISHED,
                     new EventData(nodeStatus, this.node.getNestedId())));
         return true;
       }
@@ -580,13 +580,13 @@ public class JobRunner extends EventHandler implements Runnable {
     Status finalStatus = this.node.getStatus();
     uploadExecutableNode();
     if (!errorFound && !isKilled()) {
-      fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(this.node)));
+      fireEvent(Event.create(this, EventType.JOB_STARTED, new EventData(this.node)));
 
       final Status prepareStatus = prepareJob();
       if (prepareStatus != null) {
         // Writes status to the db
         writeStatus();
-        fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED,
+        fireEvent(Event.create(this, EventType.JOB_STATUS_CHANGED,
             new EventData(prepareStatus, this.node.getNestedId())));
         finalStatus = runJob();
       } else {
@@ -609,7 +609,7 @@ public class JobRunner extends EventHandler implements Runnable {
         "Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
             + " with status " + this.node.getStatus());
 
-    fireEvent(Event.create(this, Type.JOB_FINISHED,
+    fireEvent(Event.create(this, EventType.JOB_FINISHED,
         new EventData(finalStatus, this.node.getNestedId())), false);
     finalizeLogFile(this.node.getAttempt());
     finalizeAttachmentFile();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
index 6116583..b48b85f 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -17,13 +17,13 @@
 package azkaban.execapp.metric;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.executor.Status;
 import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.spi.EventType;
 
 /**
  * Metric to keep track of number of failed flows in between the tracking events
@@ -47,7 +47,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
    */
   @Override
   public synchronized void handleEvent(final Event event) {
-    if (event.getType() == Type.FLOW_FINISHED) {
+    if (event.getType() == EventType.FLOW_FINISHED) {
       final FlowRunner runner = (FlowRunner) event.getRunner();
       if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
         this.value = this.value + 1;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index b069b0b..7ad0a46 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -17,12 +17,12 @@
 package azkaban.execapp.metric;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.executor.Status;
 import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.spi.EventType;
 
 /**
  * Metric to keep track of number of failed jobs in between the tracking events
@@ -45,7 +45,8 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
    */
   @Override
   public synchronized void handleEvent(final Event event) {
-    if (event.getType() == Type.JOB_FINISHED && Status.FAILED.equals(event.getData().getStatus())) {
+    if (event.getType() == EventType.JOB_FINISHED && Status.FAILED
+        .equals(event.getData().getStatus())) {
       this.value = this.value + 1;
     }
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index 5be6b68..a6ac7ee 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -17,11 +17,11 @@
 package azkaban.execapp.metric;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
+import azkaban.spi.EventType;
 
 /**
  * Metric to keep track of number of running jobs in Azkaban exec server
@@ -49,9 +49,9 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
    */
   @Override
   public synchronized void handleEvent(final Event event) {
-    if (event.getType() == Type.JOB_STARTED) {
+    if (event.getType() == EventType.JOB_STARTED) {
       this.value = this.value + 1;
-    } else if (event.getType() == Type.JOB_FINISHED) {
+    } else if (event.getType() == EventType.JOB_FINISHED) {
       this.value = this.value - 1;
     }
   }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java
new file mode 100644
index 0000000..df48c28
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java
@@ -0,0 +1,111 @@
+package azkaban.execapp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
+import azkaban.utils.Props;
+import java.util.Map;
+import org.junit.Test;
+
+public class AzkabanExecServerModuleTest {
+
+  /**
+   * Verify that alternate implementation of the <code>AzkabanEventReporter</code>
+   * is initialized.
+   */
+  @Test
+  public void testCreateAzkabanEventReporter() {
+    final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+    final Props props = new Props();
+    props.put("azkaban.event.reporting.enabled", "true");
+    props.put("azkaban.event.reporting.class",
+        "azkaban.execapp.AzkabanEventReporterTest1");
+    final AzkabanEventReporter azkabanEventReporter = azkabanExecServerModule
+        .createAzkabanEventReporter(props);
+    assertThat(azkabanEventReporter).isNotNull();
+    assertThat(azkabanEventReporter).isInstanceOf(AzkabanEventReporterTest1.class);
+  }
+
+  /**
+   * Verify that <code>IllegalArgumentException</code> is thrown when required properties
+   * are missing.
+   */
+  @Test
+  public void testAzkabanEventReporterInvalidProperties() {
+    final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+    final Props props = new Props();
+    props.put("azkaban.event.reporting.enabled", "true");
+    props.put("azkaban.event.reporting.class",
+        "azkaban.execapp.reporter.AzkabanKafkaAvroEventReporter");
+    assertThatIllegalArgumentException()
+        .isThrownBy(() -> azkabanExecServerModule.createAzkabanEventReporter(props));
+  }
+
+  /**
+   * Verify that a <code>RuntimeException</code> is thrown when valid constructor is
+   * not found in the event reporter implementation.
+   */
+  @Test
+  public void testAzkabanEventReporterInvalidConstructor() {
+    final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+    final Props props = new Props();
+    props.put("azkaban.event.reporting.enabled", "true");
+    props.put("azkaban.event.reporting.class",
+        "azkaban.execapp.AzkabanEventReporterTest3");
+    assertThatExceptionOfType(RuntimeException.class)
+        .isThrownBy(() -> azkabanExecServerModule.createAzkabanEventReporter(props));
+  }
+
+  /**
+   * Ensures that the event reporter is not initialized when the property 'event.reporter.enabled'
+   * is not set.
+   */
+  @Test
+  public void testEventReporterDisabled() {
+    final AzkabanExecServerModule azkabanExecServerModule = new AzkabanExecServerModule();
+    final AzkabanEventReporter azkabanEventReporter = azkabanExecServerModule
+        .createAzkabanEventReporter(new Props());
+    assertThat(azkabanEventReporter).isNull();
+  }
+
+}
+
+// Dummy implementation of the AzkabanEventReporter interface
+// with valid constructor.
+class AzkabanEventReporterTest1 implements AzkabanEventReporter {
+
+  public AzkabanEventReporterTest1(final Props props) {
+
+  }
+
+  @Override
+  public boolean report(final EventType eventType, final Map<String, String> metadata) {
+    return false;
+  }
+}
+
+// Dummy implementation of the AzkabanEventReporter interface, for test.
+// Valid constructor is not available.
+class AzkabanEventReporterTest2 implements AzkabanEventReporter {
+
+  @Override
+  public boolean report(final EventType eventType, final Map<String, String> metadata) {
+    return false;
+  }
+}
+
+// Dummy implementation of the AzkabanEventReporter with an invalid constructor
+class AzkabanEventReporterTest3 implements AzkabanEventReporter {
+
+  public AzkabanEventReporterTest3() {
+
+  }
+
+  @Override
+  public boolean report(final EventType eventType, final Map<String, String> metadata) {
+    return false;
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index 681acd5..2244ade 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -30,6 +30,7 @@ import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -42,6 +43,7 @@ import org.junit.Test;
 
 public class LocalFlowWatcherTest {
 
+  private final AzkabanEventReporter azkabanEventReporter = null;
   private JobTypeManager jobtypeManager;
   private int dirVal = 0;
 
@@ -160,14 +162,14 @@ public class LocalFlowWatcherTest {
 
   private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
 
       System.out.println("Node " + node.getId() + " start: "
           + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -194,14 +196,14 @@ public class LocalFlowWatcherTest {
 
   private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
 
       long minDiff = Long.MAX_VALUE;
       for (final String watchedChild : watchedNode.getOutNodes()) {
@@ -209,7 +211,7 @@ public class LocalFlowWatcherTest {
         if (child == null) {
           continue;
         }
-        Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
+        Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
         final long diff = node.getStartTime() - child.getEndTime();
         minDiff = Math.min(minDiff, diff);
         System.out.println("Node " + node.getId() + " start: "
@@ -252,7 +254,7 @@ public class LocalFlowWatcherTest {
     }
     loader.uploadExecutableFlow(exFlow);
     final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
-        this.jobtypeManager, azkabanProps);
+        this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
     runner.setFlowWatcher(watcher);
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index e5130b2..49f443d 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -32,6 +32,7 @@ import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -46,6 +47,7 @@ import org.junit.rules.TemporaryFolder;
 
 public class RemoteFlowWatcherTest {
 
+  private final AzkabanEventReporter azkabanEventReporter = null;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private JobTypeManager jobtypeManager;
@@ -182,14 +184,14 @@ public class RemoteFlowWatcherTest {
 
   private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
 
       System.out.println("Node " + node.getId() + " start: "
           + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -215,7 +217,7 @@ public class RemoteFlowWatcherTest {
   private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second,
       final boolean job4Skipped) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
@@ -275,7 +277,7 @@ public class RemoteFlowWatcherTest {
 
     loader.uploadExecutableFlow(exFlow);
     final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
-        this.jobtypeManager, azkabanProps);
+        this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
     runner.setFlowWatcher(watcher);
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
index adf17e0..28fb57a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
@@ -19,8 +19,8 @@ package azkaban.execapp;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
+import azkaban.spi.EventType;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -31,9 +31,9 @@ public class EventCollectorListener implements EventListener {
   public static final Object handleEvent = new Object();
   // CopyOnWriteArrayList allows concurrent iteration and modification
   private final List<Event> eventList = new CopyOnWriteArrayList<>();
-  private final HashSet<Event.Type> filterOutTypes = new HashSet<>();
+  private final HashSet<EventType> filterOutTypes = new HashSet<>();
 
-  public void setEventFilterOut(final Event.Type... types) {
+  public void setEventFilterOut(final EventType... types) {
     this.filterOutTypes.addAll(Arrays.asList(types));
   }
 
@@ -62,7 +62,7 @@ public class EventCollectorListener implements EventListener {
     return true;
   }
 
-  public void assertEvents(final Type... expected) {
+  public void assertEvents(final EventType... expected) {
     final Object[] captured = this.eventList.stream()
         .filter(event -> event.getRunner() != null)
         .map(event -> event.getType())
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 0439510..8b37f50 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -35,6 +35,7 @@ import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.utils.Props;
 import java.io.File;
 import java.util.HashMap;
@@ -57,6 +58,7 @@ import org.junit.rules.TemporaryFolder;
 public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
   private static int id = 101;
+  private final AzkabanEventReporter azkabanEventReporter = null;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File workingDir;
@@ -575,8 +577,8 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
     final FlowRunner runner =
         new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
-            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
-
+            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
+            this.azkabanEventReporter);
     runner.addListener(eventCollector);
 
     return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 367cb4e..ff82221 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -29,6 +29,7 @@ import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -59,6 +60,7 @@ import org.junit.Test;
 public class FlowRunnerPropertyResolutionTest {
 
   private static int id = 101;
+  private final AzkabanEventReporter azkabanEventReporter = null;
   private File workingDir;
   private JobTypeManager jobtypeManager;
   private ExecutorLoader fakeExecutorLoader;
@@ -213,7 +215,8 @@ public class FlowRunnerPropertyResolutionTest {
 
     final FlowRunner runner =
         new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
-            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
+            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
+            this.azkabanEventReporter);
     return runner;
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 73fd90f..64128df 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -20,8 +20,6 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.when;
 
-import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
@@ -33,6 +31,8 @@ import azkaban.jobExecutor.AllJobExecutorTests;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
+import azkaban.spi.EventType;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -48,6 +48,7 @@ import org.mockito.MockitoAnnotations;
 public class FlowRunnerTest extends FlowRunnerTestBase {
 
   private static final File TEST_DIR = ExecutionsTestUtil.getFlowDir("exectest1");
+  private final AzkabanEventReporter azkabanEventReporter = null;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File workingDir;
@@ -76,8 +77,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
   @Test
   public void exec1Normal() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
 
     startThread(this.runner);
@@ -97,14 +98,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job8", Status.SUCCEEDED);
     assertStatus("job10", Status.SUCCEEDED);
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void exec1Disabled() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     final ExecutableFlow exFlow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
 
@@ -137,14 +138,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job8", Status.SUCCEEDED);
     assertStatus("job10", Status.SKIPPED);
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void exec1Failed() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     final ExecutableFlow flow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
 
@@ -168,14 +169,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job10", Status.CANCELLED);
     assertThreadShutDown();
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void exec1FailedKillAll() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     final ExecutableFlow flow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
     flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
@@ -200,14 +201,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job9", Status.CANCELLED);
     assertStatus("job10", Status.CANCELLED);
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void exec1FailedFinishRest() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     final ExecutableFlow flow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
     flow.getExecutionOptions().setFailureAction(
@@ -231,14 +232,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job10", Status.CANCELLED);
     assertThreadShutDown();
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void execAndCancel() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
 
     startThread(this.runner);
@@ -266,14 +267,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
 
     waitForAndAssertFlowStatus(Status.KILLED);
 
-    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
+    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
 
   @Test
   public void execRetries() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
-        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
+        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
     this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
 
     startThread(this.runner);
@@ -345,7 +346,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
 
     loader.uploadExecutableFlow(flow);
     final FlowRunner runner =
-        new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
+        new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
+            this.azkabanEventReporter);
 
     runner.addListener(eventCollector);
 
@@ -366,7 +368,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     loader.uploadExecutableFlow(exFlow);
 
     final FlowRunner runner =
-        new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
+        new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
+            this.azkabanEventReporter);
 
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index e5b9817..0790935 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -35,6 +35,7 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanEventReporter;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -95,6 +96,7 @@ import org.junit.rules.TemporaryFolder;
 public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
   private static int id = 101;
+  private final AzkabanEventReporter azkabanEventReporter = null;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File workingDir;
@@ -1127,8 +1129,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
     final FlowRunner runner = new FlowRunner(
         this.fakeExecutorLoader.fetchExecutableFlow(exId), this.fakeExecutorLoader,
-        mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
-
+        mock(ProjectLoader.class), this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
     runner.addListener(eventCollector);
 
     return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index 53cf411..ea46cef 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -19,7 +19,6 @@ package azkaban.execapp;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import azkaban.event.Event;
-import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
@@ -30,6 +29,7 @@ import azkaban.executor.Status;
 import azkaban.jobExecutor.ProcessJob;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
+import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -83,12 +83,12 @@ public class JobRunnerTest {
         createJobRunner(1, "testJob", 0, false, loader, eventCollector);
     final ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
         || runner.getStatus() != Status.FAILED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -103,7 +103,8 @@ public class JobRunnerTest {
 
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+    eventCollector
+        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
   @Ignore
@@ -132,7 +133,8 @@ public class JobRunnerTest {
     Assert.assertTrue(!runner.isKilled());
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+    eventCollector
+        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
   @Test
@@ -163,7 +165,7 @@ public class JobRunnerTest {
 
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
 
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
+    eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
   }
 
   @Test
@@ -194,7 +196,7 @@ public class JobRunnerTest {
     Assert.assertTrue(outputProps == null);
     Assert.assertTrue(runner.getLogFilePath() == null);
     Assert.assertTrue(!runner.isKilled());
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
+    eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
   }
 
   @Ignore
@@ -233,7 +235,8 @@ public class JobRunnerTest {
     Assert.assertTrue(logFile.exists());
     Assert.assertTrue(eventCollector.checkOrdering());
     Assert.assertTrue(runner.isKilled());
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+    eventCollector
+        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
   @Ignore
@@ -247,11 +250,11 @@ public class JobRunnerTest {
     final long startTime = System.currentTimeMillis();
     final ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -268,7 +271,8 @@ public class JobRunnerTest {
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
     Assert.assertTrue(eventCollector.checkOrdering());
-    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
+    eventCollector
+        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
   @Test
@@ -281,7 +285,7 @@ public class JobRunnerTest {
     final long startTime = System.currentTimeMillis();
     final ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     final Thread thread = new Thread(runner);
@@ -293,7 +297,7 @@ public class JobRunnerTest {
     runner.kill();
     StatusTestUtils.waitForStatus(node, Status.KILLED);
 
-    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -312,7 +316,7 @@ public class JobRunnerTest {
     // wait so that there's time to make the "DB update" for KILLED status
     azkaban.test.TestUtils.await().untilAsserted(
         () -> assertThat(loader.getNodeUpdateCount("testJob")).isEqualTo(2));
-    eventCollector.assertEvents(Event.Type.JOB_FINISHED);
+    eventCollector.assertEvents(EventType.JOB_FINISHED);
   }
 
   private Props createProps(final int sleepSec, final boolean fail) {
diff --git a/azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java b/azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java
new file mode 100644
index 0000000..73c46d1
--- /dev/null
+++ b/azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java
@@ -0,0 +1,19 @@
+package azkaban.spi;
+
+import java.util.Map;
+
+/**
+ * Implement this interface to report flow and job events. Event reporter
+ * can be turned on by setting the property {@code AZKABAN_EVENT_REPORTING_ENABLED} to true.
+ *
+ * By default, a KafkaAvroEventReporter is provided. Alternate implementations
+ * can be provided by setting the property {@code AZKABAN_EVENT_REPORTING_CLASS_PARAM}
+ * <br><br>
+ * The constructor will be called with a {@code azkaban.utils.Props} object passed as
+ * the only parameter. If such a constructor doesn't exist, then the AzkabanEventReporter
+ * instantiation will fail.
+ */
+public interface AzkabanEventReporter {
+
+  boolean report(EventType eventType, Map<String, String> metadata);
+}
diff --git a/azkaban-spi/src/main/java/azkaban/spi/EventType.java b/azkaban-spi/src/main/java/azkaban/spi/EventType.java
new file mode 100644
index 0000000..71f04f7
--- /dev/null
+++ b/azkaban-spi/src/main/java/azkaban/spi/EventType.java
@@ -0,0 +1,14 @@
+package azkaban.spi;
+
+/**
+ * Enum class defining the list of supported event types.
+ */
+public enum EventType {
+  FLOW_STARTED,
+  FLOW_FINISHED,
+  JOB_STARTED,
+  JOB_FINISHED,
+  JOB_STATUS_CHANGED,
+  EXTERNAL_FLOW_UPDATED,
+  EXTERNAL_JOB_UPDATED
+}

build.gradle 2(+1 -1)

diff --git a/build.gradle b/build.gradle
index a152178..d179314 100644
--- a/build.gradle
+++ b/build.gradle
@@ -88,6 +88,7 @@ ext.deps = [
         jexl                : 'org.apache.commons:commons-jexl:2.1.1',
         jodaTime            : 'joda-time:joda-time:2.0',
         jopt                : 'net.sf.jopt-simple:jopt-simple:4.3',
+        jsr305              : 'com.google.code.findbugs:jsr305:3.0.2',
         junit               : 'junit:junit:4.12',
         kafkaLog4jAppender  : 'org.apache.kafka:kafka-log4j-appender:0.10.0.0',
         log4j               : 'log4j:log4j:1.2.16',
@@ -149,7 +150,6 @@ subprojects {
             compile deps.log4j
             compile deps.guice
             compile deps.slf4j
-
             runtime deps.slf4jLog4j
 
             testCompile deps.assertj