azkaban-aplcache

Revert "Event reporting for azkaban events. (#1332)" (#1517) This

9/29/2017 2:13:37 AM

Changes

azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaAvroEventReporter.java 124(+0 -124)

azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaPusher.java 30(+0 -30)

azkaban-exec-server/src/test/java/azkaban/execapp/AzkabanExecServerModuleTest.java 111(+0 -111)

azkaban-exec-server/src/test/java/azkaban/execapp/EventReporterUtil.java 37(+0 -37)

azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanEventReporterTest.java 64(+0 -64)

azkaban-exec-server/src/test/java/azkaban/execapp/reporter/AzkabanKafkaPusherTest.java 19(+0 -19)

azkaban-spi/src/main/java/azkaban/spi/AzkabanEventReporter.java 19(+0 -19)

azkaban-spi/src/main/java/azkaban/spi/EventType.java 14(+0 -14)

build.gradle 5(+1 -4)

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 53e006c..77d2151 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -124,17 +124,6 @@ public class Constants {
     public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
     public static final String PROJECT_TEMP_DIR = "project.temp.dir";
 
-    // Event reporting properties
-    public static final String AZKABAN_EVENT_REPORTING_CLASS_PARAM =
-        "azkaban.event.reporting.class";
-    public static final String AZKABAN_EVENT_REPORTING_ENABLED = "azkaban.event.reporting.enabled";
-    public static final String AZKABAN_EVENT_REPORTING_KAFKA_BROKERS =
-        "azkaban.event.reporting.kafka.brokers";
-    public static final String AZKABAN_EVENT_REPORTING_KAFKA_TOPIC =
-        "azkaban.event.reporting.kafka.topic";
-    public static final String AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL =
-        "azkaban.event.reporting.kafka.schema.registry.url";
-
     /*
      * The max number of artifacts retained per project.
      * Accepted Values:
diff --git a/azkaban-common/src/main/java/azkaban/event/Event.java b/azkaban-common/src/main/java/azkaban/event/Event.java
index 832dd99..fc79b4a 100644
--- a/azkaban-common/src/main/java/azkaban/event/Event.java
+++ b/azkaban-common/src/main/java/azkaban/event/Event.java
@@ -16,17 +16,16 @@
 
 package azkaban.event;
 
-import azkaban.spi.EventType;
 import com.google.common.base.Preconditions;
 
 public class Event {
 
   private final Object runner;
-  private final EventType type;
+  private final Type type;
   private final EventData eventData;
   private final long time;
 
-  private Event(final Object runner, final EventType type, final EventData eventData) {
+  private Event(final Object runner, final Type type, final EventData eventData) {
     this.runner = runner;
     this.type = type;
     this.eventData = eventData;
@@ -42,7 +41,7 @@ public class Event {
    * @return New Event instance.
    * @throws NullPointerException if EventData is null.
    */
-  public static Event create(final Object runner, final EventType type, final EventData eventData)
+  public static Event create(final Object runner, final Type type, final EventData eventData)
       throws NullPointerException {
     Preconditions.checkNotNull(eventData, "EventData was null");
     return new Event(runner, type, eventData);
@@ -52,7 +51,7 @@ public class Event {
     return this.runner;
   }
 
-  public EventType getType() {
+  public Type getType() {
     return this.type;
   }
 
@@ -64,5 +63,14 @@ public class Event {
     return this.eventData;
   }
 
+  public enum Type {
+    FLOW_STARTED,
+    FLOW_FINISHED,
+    JOB_STARTED,
+    JOB_FINISHED,
+    JOB_STATUS_CHANGED,
+    EXTERNAL_FLOW_UPDATED,
+    EXTERNAL_JOB_UPDATED
+  }
 
 }
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 909088f..1d35d4d 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -5,10 +5,6 @@ dependencies {
     compile(project(':azkaban-common'))
 
     compile deps.kafkaLog4jAppender
-    compile(deps.gobblinKafka) {
-        exclude group: 'org.apache.hadoop'
-    }
-
     runtime(project(':azkaban-hadoop-security-plugin'))
 
     testCompile(project(path: ':azkaban-common', configuration: 'testCompile'))
@@ -21,16 +17,6 @@ dependencies {
     testCompile deps.hadoopHdfs
 }
 
-configurations.compile {
-    exclude group: 'com.linkedin.gobblin', module: 'gobblin-api'
-    exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-graphite'
-    exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-hadoop'
-    exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-influxdb'
-    exclude group: 'com.linkedin.gobblin', module: 'gobblin-runtime'
-
-    exclude group: 'org.projectlombok', module: 'lombok'
-}
-
 distributions {
     main {
         contents {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index 75f84ef..683ca35 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -17,21 +17,9 @@
 
 package azkaban.execapp;
 
-import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_CLASS_PARAM;
-import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_ENABLED;
-
-import azkaban.execapp.reporter.AzkabanKafkaAvroEventReporter;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import javax.inject.Inject;
-import javax.inject.Singleton;
-import org.apache.log4j.Logger;
 
 /**
  * This Guice module is currently a one place container for all bindings in the current module. This
@@ -40,47 +28,9 @@ import org.apache.log4j.Logger;
  */
 public class AzkabanExecServerModule extends AbstractModule {
 
-  private static final Logger logger = Logger.getLogger(AzkabanExecServerModule.class);
-
   @Override
   protected void configure() {
     install(new ExecJettyServerModule());
     bind(ExecutorLoader.class).to(JdbcExecutorLoader.class);
   }
-
-  @Inject
-  @Provides
-  @Singleton
-  public AzkabanEventReporter createAzkabanEventReporter(final Props props) {
-    final boolean eventReporterEnabled =
-        props.getBoolean(AZKABAN_EVENT_REPORTING_ENABLED, false);
-
-    if (!eventReporterEnabled) {
-      logger.info("Event reporter is not enabled");
-      return null;
-    }
-
-    final Class<?> eventReporterClass =
-        props.getClass(AZKABAN_EVENT_REPORTING_CLASS_PARAM, AzkabanKafkaAvroEventReporter.class);
-    if (eventReporterClass != null && eventReporterClass.getConstructors().length > 0) {
-      this.logger.info("Loading event reporter class " + eventReporterClass.getName());
-      try {
-        final Constructor<?> eventReporterClassConstructor =
-            eventReporterClass.getConstructor(Props.class);
-        return (AzkabanEventReporter) eventReporterClassConstructor.newInstance(props);
-      } catch (final InvocationTargetException e) {
-        this.logger.error(e.getTargetException().getMessage());
-        if (e.getTargetException() instanceof IllegalArgumentException) {
-          throw new IllegalArgumentException(e);
-        } else {
-          throw new RuntimeException(e);
-        }
-      } catch (final Exception e) {
-        this.logger.error("Could not instantiate EventReporter " + eventReporterClass.getName());
-        throw new RuntimeException(e);
-      }
-    }
-    return null;
-  }
-
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
index 44bcc59..30f96ce 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -14,7 +14,6 @@ import azkaban.execapp.jmx.JmxJobCallback;
 import azkaban.execapp.jmx.JmxJobCallbackMBean;
 import azkaban.executor.Status;
 import azkaban.jobcallback.JobCallbackStatusEnum;
-import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import java.net.InetAddress;
@@ -109,9 +108,9 @@ public class JobCallbackManager implements EventListener {
 
     if (event.getRunner() instanceof JobRunner) {
       try {
-        if (event.getType() == EventType.JOB_STARTED) {
+        if (event.getType() == Event.Type.JOB_STARTED) {
           processJobCallOnStart(event);
-        } else if (event.getType() == EventType.JOB_FINISHED) {
+        } else if (event.getType() == Event.Type.JOB_FINISHED) {
           processJobCallOnFinish(event);
         }
       } catch (final Throwable e) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
index f90470a..1376249 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -17,12 +17,12 @@
 package azkaban.execapp.event;
 
 import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.event.EventListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.ExecutableNode;
-import azkaban.spi.EventType;
 
 public class LocalFlowWatcher extends FlowWatcher {
 
@@ -58,7 +58,7 @@ public class LocalFlowWatcher extends FlowWatcher {
 
     @Override
     public void handleEvent(final Event event) {
-      if (event.getType() == EventType.JOB_FINISHED) {
+      if (event.getType() == Type.JOB_FINISHED) {
         if (event.getRunner() instanceof FlowRunner) {
           // The flow runner will finish a job without it running
           final EventData eventData = event.getData();
@@ -72,7 +72,7 @@ public class LocalFlowWatcher extends FlowWatcher {
           System.out.println(node + " looks like " + node.getStatus());
           handleJobStatusChange(node.getNestedId(), node.getStatus());
         }
-      } else if (event.getType() == EventType.FLOW_FINISHED) {
+      } else if (event.getType() == Type.FLOW_FINISHED) {
         stopWatcher();
       }
     }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index d5ac66f..76be252 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -16,10 +16,9 @@
 
 package azkaban.execapp;
 
-import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
-
 import azkaban.ServiceProvider;
 import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.event.EventHandler;
 import azkaban.event.EventListener;
@@ -44,8 +43,6 @@ import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.sla.SlaOption;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import azkaban.utils.SwapQueue;
 import com.google.common.collect.ImmutableSet;
@@ -94,12 +91,10 @@ public class FlowRunner extends EventHandler implements Runnable {
   private final Props azkabanProps;
   private final Map<String, Props> sharedProps = new HashMap<>();
   private final JobRunnerEventListener listener = new JobRunnerEventListener();
-  private final FlowRunnerEventListener flowListener = new FlowRunnerEventListener();
   private final Set<JobRunner> activeJobRunners = Collections
       .newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
   // Thread safe swap queue for finishedExecutions.
   private final SwapQueue<ExecutableNode> finishedNodes;
-  private final AzkabanEventReporter azkabanEventReporter;
   private Logger logger;
   private Appender flowAppender;
   private File logFile;
@@ -109,16 +104,21 @@ public class FlowRunner extends EventHandler implements Runnable {
   // Used for pipelining
   private Integer pipelineLevel = null;
   private Integer pipelineExecId = null;
+
   // Watches external flows for execution.
   private FlowWatcher watcher = null;
+
   private Set<String> proxyUsers = null;
   private boolean validateUserProxy;
+
   private String jobLogFileSize = "5MB";
   private int jobLogNumFiles = 4;
+
   private boolean flowPaused = false;
   private boolean flowFailed = false;
   private boolean flowFinished = false;
   private boolean flowKilled = false;
+
   // The following is state that will trigger a retry of all failed jobs
   private boolean retryFailedJobs = false;
 
@@ -127,10 +127,9 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
-      final Props azkabanProps, final AzkabanEventReporter azkabanEventReporter)
+      final Props azkabanProps)
       throws ExecutorManagerException {
-    this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps,
-        azkabanEventReporter);
+    this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps);
   }
 
   /**
@@ -138,8 +137,7 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
-      final ExecutorService executorService, final Props azkabanProps,
-      final AzkabanEventReporter azkabanEventReporter)
+      final ExecutorService executorService, final Props azkabanProps)
       throws ExecutorManagerException {
     this.execId = flow.getExecutionId();
     this.flow = flow;
@@ -156,15 +154,10 @@ public class FlowRunner extends EventHandler implements Runnable {
     this.executorService = executorService;
     this.finishedNodes = new SwapQueue<>();
     this.azkabanProps = azkabanProps;
-    // Add the flow listener only if a non-null eventReporter is available.
-    if (azkabanEventReporter != null) {
-      this.addListener(this.flowListener);
-    }
 
     // Create logger and execution dir in flowRunner initialization instead of flow runtime to avoid NPE
     // where the uninitialized logger is used in flow preparing state
     createLogger(this.flow.getFlowId());
-    this.azkabanEventReporter = azkabanEventReporter;
   }
 
   public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
@@ -208,7 +201,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       loadAllProperties();
 
       this.fireEventListeners(
-          Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow())));
+          Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow())));
       runFlow();
     } catch (final Throwable t) {
       if (this.logger != null) {
@@ -233,8 +226,7 @@ public class FlowRunner extends EventHandler implements Runnable {
         closeLogger();
         updateFlow();
       } finally {
-        this.fireEventListeners(
-            Event.create(this, EventType.FLOW_FINISHED, new EventData(this.flow)));
+        this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(this.flow)));
       }
     }
   }
@@ -296,6 +288,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
+
   /**
    * setup logger and execution dir for the flowId
    */
@@ -566,7 +559,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   private void finishExecutableNode(final ExecutableNode node) {
     this.finishedNodes.add(node);
     final EventData eventData = new EventData(node.getStatus(), node.getNestedId());
-    fireEventListeners(Event.create(this, EventType.JOB_FINISHED, eventData));
+    fireEventListeners(Event.create(this, Type.JOB_FINISHED, eventData));
   }
 
   private void finalizeFlow(final ExecutableFlowBase flow) {
@@ -1106,79 +1099,20 @@ public class FlowRunner extends EventHandler implements Runnable {
     return ImmutableSet.copyOf(this.activeJobRunners);
   }
 
-  // Class helps report the flow start and stop events.
-  private class FlowRunnerEventListener implements EventListener {
-
-    public FlowRunnerEventListener() {
-    }
-
-    private synchronized Map<String, String> getFlowMetadata(final FlowRunner flowRunner) {
-      final ExecutableFlow flow = flowRunner.getExecutableFlow();
-      final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
-      final Map<String, String> metaData = new HashMap<>();
-      metaData.put("flowName", flow.getId());
-      metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
-      metaData.put("projectName", flow.getProjectName());
-      metaData.put("submitUser", flow.getSubmitUser());
-      metaData.put("executionId", String.valueOf(flow.getExecutionId()));
-      metaData.put("startTime", String.valueOf(flow.getStartTime()));
-      metaData.put("submitTime", String.valueOf(flow.getSubmitTime()));
-      return metaData;
-    }
-
-    @Override
-    public synchronized void handleEvent(final Event event) {
-      if (event.getType() == EventType.FLOW_STARTED) {
-        final FlowRunner flowRunner = (FlowRunner) event.getRunner();
-        final ExecutableFlow flow = flowRunner.getExecutableFlow();
-        FlowRunner.this.logger.info("Flow started: " + flow.getId());
-        FlowRunner.this.azkabanEventReporter.report(event.getType(), getFlowMetadata(flowRunner));
-      } else if (event.getType() == EventType.FLOW_FINISHED) {
-        final FlowRunner flowRunner = (FlowRunner) event.getRunner();
-        final ExecutableFlow flow = flowRunner.getExecutableFlow();
-        FlowRunner.this.logger.info("Flow ended: " + flow.getId());
-        final Map<String, String> flowMetadata = getFlowMetadata(flowRunner);
-        flowMetadata.put("endTime", String.valueOf(flow.getEndTime()));
-        flowMetadata.put("flowStatus", flow.getStatus().name());
-        FlowRunner.this.azkabanEventReporter.report(event.getType(), flowMetadata);
-      }
-    }
-  }
-
   private class JobRunnerEventListener implements EventListener {
 
     public JobRunnerEventListener() {
     }
 
-    private synchronized Map<String, String> getJobMetadata(final JobRunner jobRunner) {
-      final ExecutableNode node = jobRunner.getNode();
-      final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
-      final Map<String, String> metaData = new HashMap<>();
-      metaData.put("jobId", node.getId());
-      metaData.put("executionID", String.valueOf(node.getExecutableFlow().getExecutionId()));
-      metaData.put("flowName", node.getExecutableFlow().getId());
-      metaData.put("startTime", String.valueOf(node.getStartTime()));
-      metaData.put("jobType", String.valueOf(node.getType()));
-      metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
-      metaData.put("jobProxyUser",
-          jobRunner.getProps().getString("user.to.proxy", null));
-      return metaData;
-    }
-
     @Override
     public synchronized void handleEvent(final Event event) {
-      if (event.getType() == EventType.JOB_STATUS_CHANGED) {
+
+      if (event.getType() == Type.JOB_STATUS_CHANGED) {
         updateFlow();
-      } else if (event.getType() == EventType.JOB_FINISHED) {
+      } else if (event.getType() == Type.JOB_FINISHED) {
+        final JobRunner runner = (JobRunner) event.getRunner();
+        final ExecutableNode node = runner.getNode();
         final EventData eventData = event.getData();
-        final JobRunner jobRunner = (JobRunner) event.getRunner();
-        final ExecutableNode node = jobRunner.getNode();
-        if (FlowRunner.this.azkabanEventReporter != null) {
-          final Map<String, String> jobMetadata = getJobMetadata(jobRunner);
-          jobMetadata.put("jobStatus", node.getStatus().name());
-          jobMetadata.put("endTime", String.valueOf(node.getEndTime()));
-          FlowRunner.this.azkabanEventReporter.report(event.getType(), jobMetadata);
-        }
         final long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
         synchronized (FlowRunner.this.mainSyncObj) {
           FlowRunner.this.logger.info("Job " + eventData.getNestedId() + " finished with status "
@@ -1193,18 +1127,12 @@ public class FlowRunner extends EventHandler implements Runnable {
           }
 
           FlowRunner.this.finishedNodes.add(node);
-          FlowRunner.this.activeJobRunners.remove(jobRunner);
+          FlowRunner.this.activeJobRunners.remove(runner);
           node.getParentFlow().setUpdateTime(System.currentTimeMillis());
           interrupt();
           fireEventListeners(event);
         }
-      } else if (event.getType() == EventType.JOB_STARTED) {
-        final EventData eventData = event.getData();
-        FlowRunner.this.logger.info("Job Started: " + eventData.getNestedId());
-        if (FlowRunner.this.azkabanEventReporter != null) {
-          final JobRunner jobRunner = (JobRunner) event.getRunner();
-          FlowRunner.this.azkabanEventReporter.report(event.getType(), getJobMetadata(jobRunner));
-        }
+      } else if (event.getType() == Type.JOB_STARTED) {
         // add job level checker
         final TriggerManager triggerManager = ServiceProvider.SERVICE_PROVIDER
             .getInstance(TriggerManager.class);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 761e6db..50cdd4b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -35,8 +35,6 @@ import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectWhitelist;
 import azkaban.project.ProjectWhitelist.WhitelistType;
 import azkaban.sla.SlaOption;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.spi.EventType;
 import azkaban.storage.StorageManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
@@ -63,7 +61,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.io.FileUtils;
@@ -119,7 +116,7 @@ public class FlowRunnerManager implements EventListener,
   private final JobTypeManager jobtypeManager;
   private final FlowPreparer flowPreparer;
   private final TriggerManager triggerManager;
-  private final AzkabanEventReporter azkabanEventReporter;
+
 
   private final Props azkabanProps;
   private final File executionDirectory;
@@ -154,13 +151,11 @@ public class FlowRunnerManager implements EventListener,
       final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader,
       final StorageManager storageManager,
-      final TriggerManager triggerManager,
-      @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
+      final TriggerManager triggerManager) throws IOException {
     this.azkabanProps = props;
 
     this.executionDirRetention = props.getLong("execution.dir.retention",
         this.executionDirRetention);
-    this.azkabanEventReporter = azkabanEventReporter;
     logger.info("Execution dir retention set to " + this.executionDirRetention + " ms");
 
     this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
@@ -365,7 +360,7 @@ public class FlowRunnerManager implements EventListener,
 
     final FlowRunner runner =
         new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager,
-            this.azkabanProps, this.azkabanEventReporter);
+            this.azkabanProps);
     runner.setFlowWatcher(watcher)
         .setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles)
         .setValidateProxyUser(this.validateProxyUser)
@@ -492,16 +487,16 @@ public class FlowRunnerManager implements EventListener,
 
   @Override
   public void handleEvent(final Event event) {
-    if (event.getType() == EventType.FLOW_FINISHED || event.getType() == EventType.FLOW_STARTED) {
+    if (event.getType() == Event.Type.FLOW_FINISHED || event.getType() == Event.Type.FLOW_STARTED) {
       final FlowRunner flowRunner = (FlowRunner) event.getRunner();
       final ExecutableFlow flow = flowRunner.getExecutableFlow();
 
-      if (event.getType() == EventType.FLOW_FINISHED) {
+      if (event.getType() == Event.Type.FLOW_FINISHED) {
         this.recentlyFinishedFlows.put(flow.getExecutionId(), flow);
         logger.info("Flow " + flow.getExecutionId()
             + " is finished. Adding it to recently finished flows list.");
         this.runningFlows.remove(flow.getExecutionId());
-      } else if (event.getType() == EventType.FLOW_STARTED) {
+      } else if (event.getType() == Event.Type.FLOW_STARTED) {
         // add flow level SLA checker
         this.triggerManager
             .addTrigger(flow.getExecutionId(), SlaOption.getFlowLevelSLAOptions(flow));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
index 24e2bef..461c4b1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -6,7 +6,6 @@ import azkaban.event.EventListener;
 import azkaban.execapp.JobRunner;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.Status;
-import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import java.util.HashMap;
 import java.util.Map;
@@ -109,16 +108,16 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
             + eventData.getStatus());
       }
 
-      if (event.getType() == EventType.JOB_STARTED) {
+      if (event.getType() == Event.Type.JOB_STARTED) {
         this.runningJobCount.incrementAndGet();
-      } else if (event.getType() == EventType.JOB_FINISHED) {
+      } else if (event.getType() == Event.Type.JOB_FINISHED) {
         this.totalExecutedJobCount.incrementAndGet();
         if (this.runningJobCount.intValue() > 0) {
           this.runningJobCount.decrementAndGet();
         } else {
           logger.warn("runningJobCount not messed up, it is already zero "
               + "and we are trying to decrement on job event "
-              + EventType.JOB_FINISHED);
+              + Event.Type.JOB_FINISHED);
         }
 
         if (eventData.getStatus() == Status.FAILED) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 84fff8d..85f5a4c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
 
 import azkaban.Constants;
 import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.event.EventHandler;
 import azkaban.execapp.event.BlockingStatus;
@@ -33,7 +34,6 @@ import azkaban.jobExecutor.JavaProcessJob;
 import azkaban.jobExecutor.Job;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
-import azkaban.spi.EventType;
 import azkaban.utils.ExternalLinkUtils;
 import azkaban.utils.PatternLayoutEscaped;
 import azkaban.utils.Props;
@@ -412,12 +412,12 @@ public class JobRunner extends EventHandler implements Runnable {
       if (quickFinish) {
         this.node.setStartTime(time);
         fireEvent(
-            Event.create(this, EventType.JOB_STARTED,
+            Event.create(this, Type.JOB_STARTED,
                 new EventData(nodeStatus, this.node.getNestedId())));
         this.node.setEndTime(time);
         fireEvent(
             Event
-                .create(this, EventType.JOB_FINISHED,
+                .create(this, Type.JOB_FINISHED,
                     new EventData(nodeStatus, this.node.getNestedId())));
         return true;
       }
@@ -580,13 +580,13 @@ public class JobRunner extends EventHandler implements Runnable {
     Status finalStatus = this.node.getStatus();
     uploadExecutableNode();
     if (!errorFound && !isKilled()) {
-      fireEvent(Event.create(this, EventType.JOB_STARTED, new EventData(this.node)));
+      fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(this.node)));
 
       final Status prepareStatus = prepareJob();
       if (prepareStatus != null) {
         // Writes status to the db
         writeStatus();
-        fireEvent(Event.create(this, EventType.JOB_STATUS_CHANGED,
+        fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED,
             new EventData(prepareStatus, this.node.getNestedId())));
         finalStatus = runJob();
       } else {
@@ -609,7 +609,7 @@ public class JobRunner extends EventHandler implements Runnable {
         "Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
             + " with status " + this.node.getStatus());
 
-    fireEvent(Event.create(this, EventType.JOB_FINISHED,
+    fireEvent(Event.create(this, Type.JOB_FINISHED,
         new EventData(finalStatus, this.node.getNestedId())), false);
     finalizeLogFile(this.node.getAttempt());
     finalizeAttachmentFile();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
index b48b85f..6116583 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -17,13 +17,13 @@
 package azkaban.execapp.metric;
 
 import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.executor.Status;
 import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
-import azkaban.spi.EventType;
 
 /**
  * Metric to keep track of number of failed flows in between the tracking events
@@ -47,7 +47,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
    */
   @Override
   public synchronized void handleEvent(final Event event) {
-    if (event.getType() == EventType.FLOW_FINISHED) {
+    if (event.getType() == Type.FLOW_FINISHED) {
       final FlowRunner runner = (FlowRunner) event.getRunner();
       if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
         this.value = this.value + 1;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index 7ad0a46..b069b0b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -17,12 +17,12 @@
 package azkaban.execapp.metric;
 
 import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.executor.Status;
 import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
-import azkaban.spi.EventType;
 
 /**
  * Metric to keep track of number of failed jobs in between the tracking events
@@ -45,8 +45,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
    */
   @Override
   public synchronized void handleEvent(final Event event) {
-    if (event.getType() == EventType.JOB_FINISHED && Status.FAILED
-        .equals(event.getData().getStatus())) {
+    if (event.getType() == Type.JOB_FINISHED && Status.FAILED.equals(event.getData().getStatus())) {
       this.value = this.value + 1;
     }
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index a6ac7ee..5be6b68 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -17,11 +17,11 @@
 package azkaban.execapp.metric;
 
 import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
 import azkaban.metric.MetricException;
 import azkaban.metric.MetricReportManager;
 import azkaban.metric.TimeBasedReportingMetric;
-import azkaban.spi.EventType;
 
 /**
  * Metric to keep track of number of running jobs in Azkaban exec server
@@ -49,9 +49,9 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
    */
   @Override
   public synchronized void handleEvent(final Event event) {
-    if (event.getType() == EventType.JOB_STARTED) {
+    if (event.getType() == Type.JOB_STARTED) {
       this.value = this.value + 1;
-    } else if (event.getType() == EventType.JOB_FINISHED) {
+    } else if (event.getType() == Type.JOB_FINISHED) {
       this.value = this.value - 1;
     }
   }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index f585ea6..681acd5 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -19,7 +19,6 @@ package azkaban.execapp.event;
 import static org.mockito.Mockito.mock;
 
 import azkaban.execapp.EventCollectorListener;
-import azkaban.execapp.EventReporterUtil;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.FlowRunnerTestUtil;
 import azkaban.executor.ExecutableFlow;
@@ -31,7 +30,6 @@ import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -44,8 +42,6 @@ import org.junit.Test;
 
 public class LocalFlowWatcherTest {
 
-  private final AzkabanEventReporter azkabanEventReporter =
-      EventReporterUtil.getTestAzkabanEventReporter();
   private JobTypeManager jobtypeManager;
   private int dirVal = 0;
 
@@ -164,14 +160,14 @@ public class LocalFlowWatcherTest {
 
   private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
 
       System.out.println("Node " + node.getId() + " start: "
           + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -198,14 +194,14 @@ public class LocalFlowWatcherTest {
 
   private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
 
       long minDiff = Long.MAX_VALUE;
       for (final String watchedChild : watchedNode.getOutNodes()) {
@@ -213,7 +209,7 @@ public class LocalFlowWatcherTest {
         if (child == null) {
           continue;
         }
-        Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
+        Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
         final long diff = node.getStartTime() - child.getEndTime();
         minDiff = Math.min(minDiff, diff);
         System.out.println("Node " + node.getId() + " start: "
@@ -256,7 +252,7 @@ public class LocalFlowWatcherTest {
     }
     loader.uploadExecutableFlow(exFlow);
     final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
-        this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
+        this.jobtypeManager, azkabanProps);
     runner.setFlowWatcher(watcher);
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index 69c4835..e5130b2 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -19,7 +19,6 @@ package azkaban.execapp.event;
 import static org.mockito.Mockito.mock;
 
 import azkaban.execapp.EventCollectorListener;
-import azkaban.execapp.EventReporterUtil;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.FlowRunnerTestUtil;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -33,7 +32,6 @@ import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -48,8 +46,6 @@ import org.junit.rules.TemporaryFolder;
 
 public class RemoteFlowWatcherTest {
 
-  private final AzkabanEventReporter azkabanEventReporter =
-      EventReporterUtil.getTestAzkabanEventReporter();
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private JobTypeManager jobtypeManager;
@@ -186,14 +182,14 @@ public class RemoteFlowWatcherTest {
 
   private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+      Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
 
       System.out.println("Node " + node.getId() + " start: "
           + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -219,7 +215,7 @@ public class RemoteFlowWatcherTest {
   private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second,
       final boolean job4Skipped) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+      Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
 
       // check it's start time is after the first's children.
       final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
@@ -279,7 +275,7 @@ public class RemoteFlowWatcherTest {
 
     loader.uploadExecutableFlow(exFlow);
     final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
-        this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
+        this.jobtypeManager, azkabanProps);
     runner.setFlowWatcher(watcher);
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
index 28fb57a..adf17e0 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
@@ -19,8 +19,8 @@ package azkaban.execapp;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
-import azkaban.spi.EventType;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -31,9 +31,9 @@ public class EventCollectorListener implements EventListener {
   public static final Object handleEvent = new Object();
   // CopyOnWriteArrayList allows concurrent iteration and modification
   private final List<Event> eventList = new CopyOnWriteArrayList<>();
-  private final HashSet<EventType> filterOutTypes = new HashSet<>();
+  private final HashSet<Event.Type> filterOutTypes = new HashSet<>();
 
-  public void setEventFilterOut(final EventType... types) {
+  public void setEventFilterOut(final Event.Type... types) {
     this.filterOutTypes.addAll(Arrays.asList(types));
   }
 
@@ -62,7 +62,7 @@ public class EventCollectorListener implements EventListener {
     return true;
   }
 
-  public void assertEvents(final EventType... expected) {
+  public void assertEvents(final Type... expected) {
     final Object[] captured = this.eventList.stream()
         .filter(event -> event.getRunner() != null)
         .map(event -> event.getType())
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 24efe86..0439510 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -33,14 +33,12 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.log4j.Logger;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,8 +57,6 @@ import org.junit.rules.TemporaryFolder;
 public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
   private static int id = 101;
-  private final AzkabanEventReporter azkabanEventReporter =
-      EventReporterUtil.getTestAzkabanEventReporter();
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File workingDir;
@@ -579,8 +575,8 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
     final FlowRunner runner =
         new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
-            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
-            this.azkabanEventReporter);
+            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
+
     runner.addListener(eventCollector);
 
     return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index cbfb3a8..367cb4e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -29,7 +29,6 @@ import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -60,8 +59,6 @@ import org.junit.Test;
 public class FlowRunnerPropertyResolutionTest {
 
   private static int id = 101;
-  private final AzkabanEventReporter azkabanEventReporter =
-      EventReporterUtil.getTestAzkabanEventReporter();
   private File workingDir;
   private JobTypeManager jobtypeManager;
   private ExecutorLoader fakeExecutorLoader;
@@ -216,8 +213,7 @@ public class FlowRunnerPropertyResolutionTest {
 
     final FlowRunner runner =
         new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
-            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
-            this.azkabanEventReporter);
+            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
     return runner;
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index be40548..f1cf66c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -20,6 +20,8 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.when;
 
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
@@ -31,8 +33,6 @@ import azkaban.jobExecutor.AllJobExecutorTests;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.spi.EventType;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -48,8 +48,6 @@ import org.mockito.MockitoAnnotations;
 public class FlowRunnerTest extends FlowRunnerTestBase {
 
   private static final File TEST_DIR = ExecutionsTestUtil.getFlowDir("exectest1");
-  private final AzkabanEventReporter azkabanEventReporter =
-      EventReporterUtil.getTestAzkabanEventReporter();
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File workingDir;
@@ -78,8 +76,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
   @Test
   public void exec1Normal() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
-        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
     this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
 
     startThread(this.runner);
@@ -99,14 +97,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job8", Status.SUCCEEDED);
     assertStatus("job10", Status.SUCCEEDED);
 
-    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
   }
 
   @Test
   public void exec1Disabled() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
-        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
     final ExecutableFlow exFlow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
 
@@ -139,14 +137,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job8", Status.SUCCEEDED);
     assertStatus("job10", Status.SKIPPED);
 
-    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
   }
 
   @Test
   public void exec1Failed() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
-        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
     final ExecutableFlow flow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
 
@@ -170,14 +168,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job10", Status.CANCELLED);
     assertThreadShutDown();
 
-    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
   }
 
   @Test
   public void exec1FailedKillAll() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
-        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
     final ExecutableFlow flow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
     flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
@@ -202,14 +200,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job9", Status.CANCELLED);
     assertStatus("job10", Status.CANCELLED);
 
-    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
   }
 
   @Test
   public void exec1FailedFinishRest() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
-        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
     final ExecutableFlow flow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
     flow.getExecutionOptions().setFailureAction(
@@ -233,14 +231,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job10", Status.CANCELLED);
     assertThreadShutDown();
 
-    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
   }
 
   @Test
   public void execAndCancel() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
-        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
     this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
 
     startThread(this.runner);
@@ -263,14 +261,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
 
     waitForAndAssertFlowStatus(Status.KILLED);
 
-    eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
   }
 
   @Test
   public void execRetries() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
-        EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
     this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
 
     startThread(this.runner);
@@ -342,8 +340,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
 
     loader.uploadExecutableFlow(flow);
     final FlowRunner runner =
-        new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
-            this.azkabanEventReporter);
+        new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
 
     runner.addListener(eventCollector);
 
@@ -364,8 +361,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     loader.uploadExecutableFlow(exFlow);
 
     final FlowRunner runner =
-        new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
-            this.azkabanEventReporter);
+        new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
 
     runner.addListener(eventCollector);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 6a25699..e5b9817 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -35,7 +35,6 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -96,8 +95,6 @@ import org.junit.rules.TemporaryFolder;
 public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
   private static int id = 101;
-  private final AzkabanEventReporter azkabanEventReporter =
-      EventReporterUtil.getTestAzkabanEventReporter();
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File workingDir;
@@ -1130,7 +1127,8 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
     final FlowRunner runner = new FlowRunner(
         this.fakeExecutorLoader.fetchExecutableFlow(exId), this.fakeExecutorLoader,
-        mock(ProjectLoader.class), this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
+        mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
+
     runner.addListener(eventCollector);
 
     return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index ea46cef..53cf411 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -19,6 +19,7 @@ package azkaban.execapp;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import azkaban.event.Event;
+import azkaban.event.Event.Type;
 import azkaban.event.EventData;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
@@ -29,7 +30,6 @@ import azkaban.executor.Status;
 import azkaban.jobExecutor.ProcessJob;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
-import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -83,12 +83,12 @@ public class JobRunnerTest {
         createJobRunner(1, "testJob", 0, false, loader, eventCollector);
     final ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
         || runner.getStatus() != Status.FAILED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -103,8 +103,7 @@ public class JobRunnerTest {
 
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
-    eventCollector
-        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
+    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
   }
 
   @Ignore
@@ -133,8 +132,7 @@ public class JobRunnerTest {
     Assert.assertTrue(!runner.isKilled());
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
-    eventCollector
-        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
+    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
   }
 
   @Test
@@ -165,7 +163,7 @@ public class JobRunnerTest {
 
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
 
-    eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
+    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
   }
 
   @Test
@@ -196,7 +194,7 @@ public class JobRunnerTest {
     Assert.assertTrue(outputProps == null);
     Assert.assertTrue(runner.getLogFilePath() == null);
     Assert.assertTrue(!runner.isKilled());
-    eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
+    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
   }
 
   @Ignore
@@ -235,8 +233,7 @@ public class JobRunnerTest {
     Assert.assertTrue(logFile.exists());
     Assert.assertTrue(eventCollector.checkOrdering());
     Assert.assertTrue(runner.isKilled());
-    eventCollector
-        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
+    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
   }
 
   @Ignore
@@ -250,11 +247,11 @@ public class JobRunnerTest {
     final long startTime = System.currentTimeMillis();
     final ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     runner.run();
-    eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -271,8 +268,7 @@ public class JobRunnerTest {
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
     Assert.assertTrue(eventCollector.checkOrdering());
-    eventCollector
-        .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
+    eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
   }
 
   @Test
@@ -285,7 +281,7 @@ public class JobRunnerTest {
     final long startTime = System.currentTimeMillis();
     final ExecutableNode node = runner.getNode();
 
-    eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
     final Thread thread = new Thread(runner);
@@ -297,7 +293,7 @@ public class JobRunnerTest {
     runner.kill();
     StatusTestUtils.waitForStatus(node, Status.KILLED);
 
-    eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
+    eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
@@ -316,7 +312,7 @@ public class JobRunnerTest {
     // wait so that there's time to make the "DB update" for KILLED status
     azkaban.test.TestUtils.await().untilAsserted(
         () -> assertThat(loader.getNodeUpdateCount("testJob")).isEqualTo(2));
-    eventCollector.assertEvents(EventType.JOB_FINISHED);
+    eventCollector.assertEvents(Event.Type.JOB_FINISHED);
   }
 
   private Props createProps(final int sleepSec, final boolean fail) {

build.gradle 5(+1 -4)

diff --git a/build.gradle b/build.gradle
index a048d5a..a152178 100644
--- a/build.gradle
+++ b/build.gradle
@@ -48,9 +48,6 @@ allprojects {
     repositories {
         mavenCentral()
         mavenLocal()
-        maven {
-            url "http://packages.confluent.io/maven/"
-        }
     }
 }
 
@@ -70,7 +67,6 @@ ext.deps = [
         dbcp2               : 'org.apache.commons:commons-dbcp2:2.1.1',
         dbutils             : 'commons-dbutils:commons-dbutils:1.5',
         fileupload          : 'commons-fileupload:commons-fileupload:1.2.1',
-        gobblinKafka        : 'com.linkedin.gobblin:gobblin-kafka-08:0.11.0',
         gson                : 'com.google.code.gson:gson:2.8.1',
         guava               : 'com.google.guava:guava:21.0',
         guice               : 'com.google.inject:guice:4.1.0',
@@ -153,6 +149,7 @@ subprojects {
             compile deps.log4j
             compile deps.guice
             compile deps.slf4j
+
             runtime deps.slf4jLog4j
 
             testCompile deps.assertj