azkaban-aplcache
Changes
az-core/src/main/java/azkaban/Constants.java 11(+0 -11)
azkaban-exec-server/build.gradle 14(+0 -14)
azkaban-exec-server/src/main/java/azkaban/execapp/reporter/AzkabanKafkaAvroEventReporter.java 124(+0 -124)
build.gradle 5(+1 -4)
Details
az-core/src/main/java/azkaban/Constants.java 11(+0 -11)
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 53e006c..77d2151 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -124,17 +124,6 @@ public class Constants {
public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
public static final String PROJECT_TEMP_DIR = "project.temp.dir";
- // Event reporting properties
- public static final String AZKABAN_EVENT_REPORTING_CLASS_PARAM =
- "azkaban.event.reporting.class";
- public static final String AZKABAN_EVENT_REPORTING_ENABLED = "azkaban.event.reporting.enabled";
- public static final String AZKABAN_EVENT_REPORTING_KAFKA_BROKERS =
- "azkaban.event.reporting.kafka.brokers";
- public static final String AZKABAN_EVENT_REPORTING_KAFKA_TOPIC =
- "azkaban.event.reporting.kafka.topic";
- public static final String AZKABAN_EVENT_REPORTING_KAFKA_SCHEMA_REGISTRY_URL =
- "azkaban.event.reporting.kafka.schema.registry.url";
-
/*
* The max number of artifacts retained per project.
* Accepted Values:
diff --git a/azkaban-common/src/main/java/azkaban/event/Event.java b/azkaban-common/src/main/java/azkaban/event/Event.java
index 832dd99..fc79b4a 100644
--- a/azkaban-common/src/main/java/azkaban/event/Event.java
+++ b/azkaban-common/src/main/java/azkaban/event/Event.java
@@ -16,17 +16,16 @@
package azkaban.event;
-import azkaban.spi.EventType;
import com.google.common.base.Preconditions;
public class Event {
private final Object runner;
- private final EventType type;
+ private final Type type;
private final EventData eventData;
private final long time;
- private Event(final Object runner, final EventType type, final EventData eventData) {
+ private Event(final Object runner, final Type type, final EventData eventData) {
this.runner = runner;
this.type = type;
this.eventData = eventData;
@@ -42,7 +41,7 @@ public class Event {
* @return New Event instance.
* @throws NullPointerException if EventData is null.
*/
- public static Event create(final Object runner, final EventType type, final EventData eventData)
+ public static Event create(final Object runner, final Type type, final EventData eventData)
throws NullPointerException {
Preconditions.checkNotNull(eventData, "EventData was null");
return new Event(runner, type, eventData);
@@ -52,7 +51,7 @@ public class Event {
return this.runner;
}
- public EventType getType() {
+ public Type getType() {
return this.type;
}
@@ -64,5 +63,14 @@ public class Event {
return this.eventData;
}
+ public enum Type {
+ FLOW_STARTED,
+ FLOW_FINISHED,
+ JOB_STARTED,
+ JOB_FINISHED,
+ JOB_STATUS_CHANGED,
+ EXTERNAL_FLOW_UPDATED,
+ EXTERNAL_JOB_UPDATED
+ }
}
azkaban-exec-server/build.gradle 14(+0 -14)
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 909088f..1d35d4d 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -5,10 +5,6 @@ dependencies {
compile(project(':azkaban-common'))
compile deps.kafkaLog4jAppender
- compile(deps.gobblinKafka) {
- exclude group: 'org.apache.hadoop'
- }
-
runtime(project(':azkaban-hadoop-security-plugin'))
testCompile(project(path: ':azkaban-common', configuration: 'testCompile'))
@@ -21,16 +17,6 @@ dependencies {
testCompile deps.hadoopHdfs
}
-configurations.compile {
- exclude group: 'com.linkedin.gobblin', module: 'gobblin-api'
- exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-graphite'
- exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-hadoop'
- exclude group: 'com.linkedin.gobblin', module: 'gobblin-metrics-influxdb'
- exclude group: 'com.linkedin.gobblin', module: 'gobblin-runtime'
-
- exclude group: 'org.projectlombok', module: 'lombok'
-}
-
distributions {
main {
contents {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
index 75f84ef..683ca35 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecServerModule.java
@@ -17,21 +17,9 @@
package azkaban.execapp;
-import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_CLASS_PARAM;
-import static azkaban.Constants.ConfigurationKeys.AZKABAN_EVENT_REPORTING_ENABLED;
-
-import azkaban.execapp.reporter.AzkabanKafkaAvroEventReporter;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.JdbcExecutorLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.utils.Props;
import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import javax.inject.Inject;
-import javax.inject.Singleton;
-import org.apache.log4j.Logger;
/**
* This Guice module is currently a one place container for all bindings in the current module. This
@@ -40,47 +28,9 @@ import org.apache.log4j.Logger;
*/
public class AzkabanExecServerModule extends AbstractModule {
- private static final Logger logger = Logger.getLogger(AzkabanExecServerModule.class);
-
@Override
protected void configure() {
install(new ExecJettyServerModule());
bind(ExecutorLoader.class).to(JdbcExecutorLoader.class);
}
-
- @Inject
- @Provides
- @Singleton
- public AzkabanEventReporter createAzkabanEventReporter(final Props props) {
- final boolean eventReporterEnabled =
- props.getBoolean(AZKABAN_EVENT_REPORTING_ENABLED, false);
-
- if (!eventReporterEnabled) {
- logger.info("Event reporter is not enabled");
- return null;
- }
-
- final Class<?> eventReporterClass =
- props.getClass(AZKABAN_EVENT_REPORTING_CLASS_PARAM, AzkabanKafkaAvroEventReporter.class);
- if (eventReporterClass != null && eventReporterClass.getConstructors().length > 0) {
- this.logger.info("Loading event reporter class " + eventReporterClass.getName());
- try {
- final Constructor<?> eventReporterClassConstructor =
- eventReporterClass.getConstructor(Props.class);
- return (AzkabanEventReporter) eventReporterClassConstructor.newInstance(props);
- } catch (final InvocationTargetException e) {
- this.logger.error(e.getTargetException().getMessage());
- if (e.getTargetException() instanceof IllegalArgumentException) {
- throw new IllegalArgumentException(e);
- } else {
- throw new RuntimeException(e);
- }
- } catch (final Exception e) {
- this.logger.error("Could not instantiate EventReporter " + eventReporterClass.getName());
- throw new RuntimeException(e);
- }
- }
- return null;
- }
-
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
index 44bcc59..30f96ce 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/JobCallbackManager.java
@@ -14,7 +14,6 @@ import azkaban.execapp.jmx.JmxJobCallback;
import azkaban.execapp.jmx.JmxJobCallbackMBean;
import azkaban.executor.Status;
import azkaban.jobcallback.JobCallbackStatusEnum;
-import azkaban.spi.EventType;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import java.net.InetAddress;
@@ -109,9 +108,9 @@ public class JobCallbackManager implements EventListener {
if (event.getRunner() instanceof JobRunner) {
try {
- if (event.getType() == EventType.JOB_STARTED) {
+ if (event.getType() == Event.Type.JOB_STARTED) {
processJobCallOnStart(event);
- } else if (event.getType() == EventType.JOB_FINISHED) {
+ } else if (event.getType() == Event.Type.JOB_FINISHED) {
processJobCallOnFinish(event);
}
} catch (final Throwable e) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
index f90470a..1376249 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -17,12 +17,12 @@
package azkaban.execapp.event;
import azkaban.event.Event;
+import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.event.EventListener;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.JobRunner;
import azkaban.executor.ExecutableNode;
-import azkaban.spi.EventType;
public class LocalFlowWatcher extends FlowWatcher {
@@ -58,7 +58,7 @@ public class LocalFlowWatcher extends FlowWatcher {
@Override
public void handleEvent(final Event event) {
- if (event.getType() == EventType.JOB_FINISHED) {
+ if (event.getType() == Type.JOB_FINISHED) {
if (event.getRunner() instanceof FlowRunner) {
// The flow runner will finish a job without it running
final EventData eventData = event.getData();
@@ -72,7 +72,7 @@ public class LocalFlowWatcher extends FlowWatcher {
System.out.println(node + " looks like " + node.getStatus());
handleJobStatusChange(node.getNestedId(), node.getStatus());
}
- } else if (event.getType() == EventType.FLOW_FINISHED) {
+ } else if (event.getType() == Type.FLOW_FINISHED) {
stopWatcher();
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index d5ac66f..76be252 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -16,10 +16,9 @@
package azkaban.execapp;
-import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
-
import azkaban.ServiceProvider;
import azkaban.event.Event;
+import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.event.EventListener;
@@ -44,8 +43,6 @@ import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.sla.SlaOption;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.spi.EventType;
import azkaban.utils.Props;
import azkaban.utils.SwapQueue;
import com.google.common.collect.ImmutableSet;
@@ -94,12 +91,10 @@ public class FlowRunner extends EventHandler implements Runnable {
private final Props azkabanProps;
private final Map<String, Props> sharedProps = new HashMap<>();
private final JobRunnerEventListener listener = new JobRunnerEventListener();
- private final FlowRunnerEventListener flowListener = new FlowRunnerEventListener();
private final Set<JobRunner> activeJobRunners = Collections
.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
// Thread safe swap queue for finishedExecutions.
private final SwapQueue<ExecutableNode> finishedNodes;
- private final AzkabanEventReporter azkabanEventReporter;
private Logger logger;
private Appender flowAppender;
private File logFile;
@@ -109,16 +104,21 @@ public class FlowRunner extends EventHandler implements Runnable {
// Used for pipelining
private Integer pipelineLevel = null;
private Integer pipelineExecId = null;
+
// Watches external flows for execution.
private FlowWatcher watcher = null;
+
private Set<String> proxyUsers = null;
private boolean validateUserProxy;
+
private String jobLogFileSize = "5MB";
private int jobLogNumFiles = 4;
+
private boolean flowPaused = false;
private boolean flowFailed = false;
private boolean flowFinished = false;
private boolean flowKilled = false;
+
// The following is state that will trigger a retry of all failed jobs
private boolean retryFailedJobs = false;
@@ -127,10 +127,9 @@ public class FlowRunner extends EventHandler implements Runnable {
*/
public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
- final Props azkabanProps, final AzkabanEventReporter azkabanEventReporter)
+ final Props azkabanProps)
throws ExecutorManagerException {
- this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps,
- azkabanEventReporter);
+ this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps);
}
/**
@@ -138,8 +137,7 @@ public class FlowRunner extends EventHandler implements Runnable {
*/
public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
- final ExecutorService executorService, final Props azkabanProps,
- final AzkabanEventReporter azkabanEventReporter)
+ final ExecutorService executorService, final Props azkabanProps)
throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
@@ -156,15 +154,10 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = executorService;
this.finishedNodes = new SwapQueue<>();
this.azkabanProps = azkabanProps;
- // Add the flow listener only if a non-null eventReporter is available.
- if (azkabanEventReporter != null) {
- this.addListener(this.flowListener);
- }
// Create logger and execution dir in flowRunner initialization instead of flow runtime to avoid NPE
// where the uninitialized logger is used in flow preparing state
createLogger(this.flow.getFlowId());
- this.azkabanEventReporter = azkabanEventReporter;
}
public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
@@ -208,7 +201,7 @@ public class FlowRunner extends EventHandler implements Runnable {
loadAllProperties();
this.fireEventListeners(
- Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow())));
+ Event.create(this, Type.FLOW_STARTED, new EventData(this.getExecutableFlow())));
runFlow();
} catch (final Throwable t) {
if (this.logger != null) {
@@ -233,8 +226,7 @@ public class FlowRunner extends EventHandler implements Runnable {
closeLogger();
updateFlow();
} finally {
- this.fireEventListeners(
- Event.create(this, EventType.FLOW_FINISHED, new EventData(this.flow)));
+ this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED, new EventData(this.flow)));
}
}
}
@@ -296,6 +288,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+
/**
* setup logger and execution dir for the flowId
*/
@@ -566,7 +559,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private void finishExecutableNode(final ExecutableNode node) {
this.finishedNodes.add(node);
final EventData eventData = new EventData(node.getStatus(), node.getNestedId());
- fireEventListeners(Event.create(this, EventType.JOB_FINISHED, eventData));
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, eventData));
}
private void finalizeFlow(final ExecutableFlowBase flow) {
@@ -1106,79 +1099,20 @@ public class FlowRunner extends EventHandler implements Runnable {
return ImmutableSet.copyOf(this.activeJobRunners);
}
- // Class helps report the flow start and stop events.
- private class FlowRunnerEventListener implements EventListener {
-
- public FlowRunnerEventListener() {
- }
-
- private synchronized Map<String, String> getFlowMetadata(final FlowRunner flowRunner) {
- final ExecutableFlow flow = flowRunner.getExecutableFlow();
- final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
- final Map<String, String> metaData = new HashMap<>();
- metaData.put("flowName", flow.getId());
- metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
- metaData.put("projectName", flow.getProjectName());
- metaData.put("submitUser", flow.getSubmitUser());
- metaData.put("executionId", String.valueOf(flow.getExecutionId()));
- metaData.put("startTime", String.valueOf(flow.getStartTime()));
- metaData.put("submitTime", String.valueOf(flow.getSubmitTime()));
- return metaData;
- }
-
- @Override
- public synchronized void handleEvent(final Event event) {
- if (event.getType() == EventType.FLOW_STARTED) {
- final FlowRunner flowRunner = (FlowRunner) event.getRunner();
- final ExecutableFlow flow = flowRunner.getExecutableFlow();
- FlowRunner.this.logger.info("Flow started: " + flow.getId());
- FlowRunner.this.azkabanEventReporter.report(event.getType(), getFlowMetadata(flowRunner));
- } else if (event.getType() == EventType.FLOW_FINISHED) {
- final FlowRunner flowRunner = (FlowRunner) event.getRunner();
- final ExecutableFlow flow = flowRunner.getExecutableFlow();
- FlowRunner.this.logger.info("Flow ended: " + flow.getId());
- final Map<String, String> flowMetadata = getFlowMetadata(flowRunner);
- flowMetadata.put("endTime", String.valueOf(flow.getEndTime()));
- flowMetadata.put("flowStatus", flow.getStatus().name());
- FlowRunner.this.azkabanEventReporter.report(event.getType(), flowMetadata);
- }
- }
- }
-
private class JobRunnerEventListener implements EventListener {
public JobRunnerEventListener() {
}
- private synchronized Map<String, String> getJobMetadata(final JobRunner jobRunner) {
- final ExecutableNode node = jobRunner.getNode();
- final Props props = ServiceProvider.SERVICE_PROVIDER.getInstance(Props.class);
- final Map<String, String> metaData = new HashMap<>();
- metaData.put("jobId", node.getId());
- metaData.put("executionID", String.valueOf(node.getExecutableFlow().getExecutionId()));
- metaData.put("flowName", node.getExecutableFlow().getId());
- metaData.put("startTime", String.valueOf(node.getStartTime()));
- metaData.put("jobType", String.valueOf(node.getType()));
- metaData.put("azkabanHost", props.getString(AZKABAN_SERVER_HOST_NAME, "unknown"));
- metaData.put("jobProxyUser",
- jobRunner.getProps().getString("user.to.proxy", null));
- return metaData;
- }
-
@Override
public synchronized void handleEvent(final Event event) {
- if (event.getType() == EventType.JOB_STATUS_CHANGED) {
+
+ if (event.getType() == Type.JOB_STATUS_CHANGED) {
updateFlow();
- } else if (event.getType() == EventType.JOB_FINISHED) {
+ } else if (event.getType() == Type.JOB_FINISHED) {
+ final JobRunner runner = (JobRunner) event.getRunner();
+ final ExecutableNode node = runner.getNode();
final EventData eventData = event.getData();
- final JobRunner jobRunner = (JobRunner) event.getRunner();
- final ExecutableNode node = jobRunner.getNode();
- if (FlowRunner.this.azkabanEventReporter != null) {
- final Map<String, String> jobMetadata = getJobMetadata(jobRunner);
- jobMetadata.put("jobStatus", node.getStatus().name());
- jobMetadata.put("endTime", String.valueOf(node.getEndTime()));
- FlowRunner.this.azkabanEventReporter.report(event.getType(), jobMetadata);
- }
final long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
synchronized (FlowRunner.this.mainSyncObj) {
FlowRunner.this.logger.info("Job " + eventData.getNestedId() + " finished with status "
@@ -1193,18 +1127,12 @@ public class FlowRunner extends EventHandler implements Runnable {
}
FlowRunner.this.finishedNodes.add(node);
- FlowRunner.this.activeJobRunners.remove(jobRunner);
+ FlowRunner.this.activeJobRunners.remove(runner);
node.getParentFlow().setUpdateTime(System.currentTimeMillis());
interrupt();
fireEventListeners(event);
}
- } else if (event.getType() == EventType.JOB_STARTED) {
- final EventData eventData = event.getData();
- FlowRunner.this.logger.info("Job Started: " + eventData.getNestedId());
- if (FlowRunner.this.azkabanEventReporter != null) {
- final JobRunner jobRunner = (JobRunner) event.getRunner();
- FlowRunner.this.azkabanEventReporter.report(event.getType(), getJobMetadata(jobRunner));
- }
+ } else if (event.getType() == Type.JOB_STARTED) {
// add job level checker
final TriggerManager triggerManager = ServiceProvider.SERVICE_PROVIDER
.getInstance(TriggerManager.class);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 761e6db..50cdd4b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -35,8 +35,6 @@ import azkaban.project.ProjectLoader;
import azkaban.project.ProjectWhitelist;
import azkaban.project.ProjectWhitelist.WhitelistType;
import azkaban.sla.SlaOption;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.spi.EventType;
import azkaban.storage.StorageManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
@@ -63,7 +61,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
@@ -119,7 +116,7 @@ public class FlowRunnerManager implements EventListener,
private final JobTypeManager jobtypeManager;
private final FlowPreparer flowPreparer;
private final TriggerManager triggerManager;
- private final AzkabanEventReporter azkabanEventReporter;
+
private final Props azkabanProps;
private final File executionDirectory;
@@ -154,13 +151,11 @@ public class FlowRunnerManager implements EventListener,
final ExecutorLoader executorLoader,
final ProjectLoader projectLoader,
final StorageManager storageManager,
- final TriggerManager triggerManager,
- @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
+ final TriggerManager triggerManager) throws IOException {
this.azkabanProps = props;
this.executionDirRetention = props.getLong("execution.dir.retention",
this.executionDirRetention);
- this.azkabanEventReporter = azkabanEventReporter;
logger.info("Execution dir retention set to " + this.executionDirRetention + " ms");
this.executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
@@ -365,7 +360,7 @@ public class FlowRunnerManager implements EventListener,
final FlowRunner runner =
new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager,
- this.azkabanProps, this.azkabanEventReporter);
+ this.azkabanProps);
runner.setFlowWatcher(watcher)
.setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles)
.setValidateProxyUser(this.validateProxyUser)
@@ -492,16 +487,16 @@ public class FlowRunnerManager implements EventListener,
@Override
public void handleEvent(final Event event) {
- if (event.getType() == EventType.FLOW_FINISHED || event.getType() == EventType.FLOW_STARTED) {
+ if (event.getType() == Event.Type.FLOW_FINISHED || event.getType() == Event.Type.FLOW_STARTED) {
final FlowRunner flowRunner = (FlowRunner) event.getRunner();
final ExecutableFlow flow = flowRunner.getExecutableFlow();
- if (event.getType() == EventType.FLOW_FINISHED) {
+ if (event.getType() == Event.Type.FLOW_FINISHED) {
this.recentlyFinishedFlows.put(flow.getExecutionId(), flow);
logger.info("Flow " + flow.getExecutionId()
+ " is finished. Adding it to recently finished flows list.");
this.runningFlows.remove(flow.getExecutionId());
- } else if (event.getType() == EventType.FLOW_STARTED) {
+ } else if (event.getType() == Event.Type.FLOW_STARTED) {
// add flow level SLA checker
this.triggerManager
.addTrigger(flow.getExecutionId(), SlaOption.getFlowLevelSLAOptions(flow));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
index 24e2bef..461c4b1 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/jmx/JmxJobMBeanManager.java
@@ -6,7 +6,6 @@ import azkaban.event.EventListener;
import azkaban.execapp.JobRunner;
import azkaban.executor.ExecutableNode;
import azkaban.executor.Status;
-import azkaban.spi.EventType;
import azkaban.utils.Props;
import java.util.HashMap;
import java.util.Map;
@@ -109,16 +108,16 @@ public class JmxJobMBeanManager implements JmxJobMXBean, EventListener {
+ eventData.getStatus());
}
- if (event.getType() == EventType.JOB_STARTED) {
+ if (event.getType() == Event.Type.JOB_STARTED) {
this.runningJobCount.incrementAndGet();
- } else if (event.getType() == EventType.JOB_FINISHED) {
+ } else if (event.getType() == Event.Type.JOB_FINISHED) {
this.totalExecutedJobCount.incrementAndGet();
if (this.runningJobCount.intValue() > 0) {
this.runningJobCount.decrementAndGet();
} else {
logger.warn("runningJobCount not messed up, it is already zero "
+ "and we are trying to decrement on job event "
- + EventType.JOB_FINISHED);
+ + Event.Type.JOB_FINISHED);
}
if (eventData.getStatus() == Status.FAILED) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 84fff8d..85f5a4c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
import azkaban.Constants;
import azkaban.event.Event;
+import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.execapp.event.BlockingStatus;
@@ -33,7 +34,6 @@ import azkaban.jobExecutor.JavaProcessJob;
import azkaban.jobExecutor.Job;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
-import azkaban.spi.EventType;
import azkaban.utils.ExternalLinkUtils;
import azkaban.utils.PatternLayoutEscaped;
import azkaban.utils.Props;
@@ -412,12 +412,12 @@ public class JobRunner extends EventHandler implements Runnable {
if (quickFinish) {
this.node.setStartTime(time);
fireEvent(
- Event.create(this, EventType.JOB_STARTED,
+ Event.create(this, Type.JOB_STARTED,
new EventData(nodeStatus, this.node.getNestedId())));
this.node.setEndTime(time);
fireEvent(
Event
- .create(this, EventType.JOB_FINISHED,
+ .create(this, Type.JOB_FINISHED,
new EventData(nodeStatus, this.node.getNestedId())));
return true;
}
@@ -580,13 +580,13 @@ public class JobRunner extends EventHandler implements Runnable {
Status finalStatus = this.node.getStatus();
uploadExecutableNode();
if (!errorFound && !isKilled()) {
- fireEvent(Event.create(this, EventType.JOB_STARTED, new EventData(this.node)));
+ fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(this.node)));
final Status prepareStatus = prepareJob();
if (prepareStatus != null) {
// Writes status to the db
writeStatus();
- fireEvent(Event.create(this, EventType.JOB_STATUS_CHANGED,
+ fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED,
new EventData(prepareStatus, this.node.getNestedId())));
finalStatus = runJob();
} else {
@@ -609,7 +609,7 @@ public class JobRunner extends EventHandler implements Runnable {
"Finishing job " + this.jobId + getNodeRetryLog() + " at " + this.node.getEndTime()
+ " with status " + this.node.getStatus());
- fireEvent(Event.create(this, EventType.JOB_FINISHED,
+ fireEvent(Event.create(this, Type.JOB_FINISHED,
new EventData(finalStatus, this.node.getNestedId())), false);
finalizeLogFile(this.node.getAttempt());
finalizeAttachmentFile();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
index b48b85f..6116583 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -17,13 +17,13 @@
package azkaban.execapp.metric;
import azkaban.event.Event;
+import azkaban.event.Event.Type;
import azkaban.event.EventListener;
import azkaban.execapp.FlowRunner;
import azkaban.executor.Status;
import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
-import azkaban.spi.EventType;
/**
* Metric to keep track of number of failed flows in between the tracking events
@@ -47,7 +47,7 @@ public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> imple
*/
@Override
public synchronized void handleEvent(final Event event) {
- if (event.getType() == EventType.FLOW_FINISHED) {
+ if (event.getType() == Type.FLOW_FINISHED) {
final FlowRunner runner = (FlowRunner) event.getRunner();
if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
this.value = this.value + 1;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
index 7ad0a46..b069b0b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -17,12 +17,12 @@
package azkaban.execapp.metric;
import azkaban.event.Event;
+import azkaban.event.Event.Type;
import azkaban.event.EventListener;
import azkaban.executor.Status;
import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
-import azkaban.spi.EventType;
/**
* Metric to keep track of number of failed jobs in between the tracking events
@@ -45,8 +45,7 @@ public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implem
*/
@Override
public synchronized void handleEvent(final Event event) {
- if (event.getType() == EventType.JOB_FINISHED && Status.FAILED
- .equals(event.getData().getStatus())) {
+ if (event.getType() == Type.JOB_FINISHED && Status.FAILED.equals(event.getData().getStatus())) {
this.value = this.value + 1;
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index a6ac7ee..5be6b68 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -17,11 +17,11 @@
package azkaban.execapp.metric;
import azkaban.event.Event;
+import azkaban.event.Event.Type;
import azkaban.event.EventListener;
import azkaban.metric.MetricException;
import azkaban.metric.MetricReportManager;
import azkaban.metric.TimeBasedReportingMetric;
-import azkaban.spi.EventType;
/**
* Metric to keep track of number of running jobs in Azkaban exec server
@@ -49,9 +49,9 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
*/
@Override
public synchronized void handleEvent(final Event event) {
- if (event.getType() == EventType.JOB_STARTED) {
+ if (event.getType() == Type.JOB_STARTED) {
this.value = this.value + 1;
- } else if (event.getType() == EventType.JOB_FINISHED) {
+ } else if (event.getType() == Type.JOB_FINISHED) {
this.value = this.value - 1;
}
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index f585ea6..681acd5 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -19,7 +19,6 @@ package azkaban.execapp.event;
import static org.mockito.Mockito.mock;
import azkaban.execapp.EventCollectorListener;
-import azkaban.execapp.EventReporterUtil;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.FlowRunnerTestUtil;
import azkaban.executor.ExecutableFlow;
@@ -31,7 +30,6 @@ import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
@@ -44,8 +42,6 @@ import org.junit.Test;
public class LocalFlowWatcherTest {
- private final AzkabanEventReporter azkabanEventReporter =
- EventReporterUtil.getTestAzkabanEventReporter();
private JobTypeManager jobtypeManager;
private int dirVal = 0;
@@ -164,14 +160,14 @@ public class LocalFlowWatcherTest {
private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+ Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
- Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+ Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
System.out.println("Node " + node.getId() + " start: "
+ node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -198,14 +194,14 @@ public class LocalFlowWatcherTest {
private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+ Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
- Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+ Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
long minDiff = Long.MAX_VALUE;
for (final String watchedChild : watchedNode.getOutNodes()) {
@@ -213,7 +209,7 @@ public class LocalFlowWatcherTest {
if (child == null) {
continue;
}
- Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
+ Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
final long diff = node.getStartTime() - child.getEndTime();
minDiff = Math.min(minDiff, diff);
System.out.println("Node " + node.getId() + " start: "
@@ -256,7 +252,7 @@ public class LocalFlowWatcherTest {
}
loader.uploadExecutableFlow(exFlow);
final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
- this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
+ this.jobtypeManager, azkabanProps);
runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index 69c4835..e5130b2 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -19,7 +19,6 @@ package azkaban.execapp.event;
import static org.mockito.Mockito.mock;
import azkaban.execapp.EventCollectorListener;
-import azkaban.execapp.EventReporterUtil;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.FlowRunnerTestUtil;
import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -33,7 +32,6 @@ import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
@@ -48,8 +46,6 @@ import org.junit.rules.TemporaryFolder;
public class RemoteFlowWatcherTest {
- private final AzkabanEventReporter azkabanEventReporter =
- EventReporterUtil.getTestAzkabanEventReporter();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private JobTypeManager jobtypeManager;
@@ -186,14 +182,14 @@ public class RemoteFlowWatcherTest {
private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+ Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
- Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+ Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
System.out.println("Node " + node.getId() + " start: "
+ node.getStartTime() + " dependent on " + watchedNode.getId() + " "
@@ -219,7 +215,7 @@ public class RemoteFlowWatcherTest {
private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second,
final boolean job4Skipped) {
for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+ Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
@@ -279,7 +275,7 @@ public class RemoteFlowWatcherTest {
loader.uploadExecutableFlow(exFlow);
final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
- this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
+ this.jobtypeManager, azkabanProps);
runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
index 28fb57a..adf17e0 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
@@ -19,8 +19,8 @@ package azkaban.execapp;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import azkaban.event.Event;
+import azkaban.event.Event.Type;
import azkaban.event.EventListener;
-import azkaban.spi.EventType;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -31,9 +31,9 @@ public class EventCollectorListener implements EventListener {
public static final Object handleEvent = new Object();
// CopyOnWriteArrayList allows concurrent iteration and modification
private final List<Event> eventList = new CopyOnWriteArrayList<>();
- private final HashSet<EventType> filterOutTypes = new HashSet<>();
+ private final HashSet<Event.Type> filterOutTypes = new HashSet<>();
- public void setEventFilterOut(final EventType... types) {
+ public void setEventFilterOut(final Event.Type... types) {
this.filterOutTypes.addAll(Arrays.asList(types));
}
@@ -62,7 +62,7 @@ public class EventCollectorListener implements EventListener {
return true;
}
- public void assertEvents(final EventType... expected) {
+ public void assertEvents(final Type... expected) {
final Object[] captured = this.eventList.stream()
.filter(event -> event.getRunner() != null)
.map(event -> event.getType())
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 24efe86..0439510 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -33,14 +33,12 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
-import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -59,8 +57,6 @@ import org.junit.rules.TemporaryFolder;
public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
private static int id = 101;
- private final AzkabanEventReporter azkabanEventReporter =
- EventReporterUtil.getTestAzkabanEventReporter();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File workingDir;
@@ -579,8 +575,8 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
final FlowRunner runner =
new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
- this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
- this.azkabanEventReporter);
+ this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
+
runner.addListener(eventCollector);
return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index cbfb3a8..367cb4e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -29,7 +29,6 @@ import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
@@ -60,8 +59,6 @@ import org.junit.Test;
public class FlowRunnerPropertyResolutionTest {
private static int id = 101;
- private final AzkabanEventReporter azkabanEventReporter =
- EventReporterUtil.getTestAzkabanEventReporter();
private File workingDir;
private JobTypeManager jobtypeManager;
private ExecutorLoader fakeExecutorLoader;
@@ -216,8 +213,7 @@ public class FlowRunnerPropertyResolutionTest {
final FlowRunner runner =
new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
- this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
- this.azkabanEventReporter);
+ this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
return runner;
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index be40548..f1cf66c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -20,6 +20,8 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.when;
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
@@ -31,8 +33,6 @@ import azkaban.jobExecutor.AllJobExecutorTests;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.spi.EventType;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
@@ -48,8 +48,6 @@ import org.mockito.MockitoAnnotations;
public class FlowRunnerTest extends FlowRunnerTestBase {
private static final File TEST_DIR = ExecutionsTestUtil.getFlowDir("exectest1");
- private final AzkabanEventReporter azkabanEventReporter =
- EventReporterUtil.getTestAzkabanEventReporter();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File workingDir;
@@ -78,8 +76,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
@Test
public void exec1Normal() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
- EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+ Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
startThread(this.runner);
@@ -99,14 +97,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job8", Status.SUCCEEDED);
assertStatus("job10", Status.SUCCEEDED);
- eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+ eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
}
@Test
public void exec1Disabled() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
- EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+ Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
final ExecutableFlow exFlow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
@@ -139,14 +137,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job8", Status.SUCCEEDED);
assertStatus("job10", Status.SKIPPED);
- eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+ eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
}
@Test
public void exec1Failed() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
- EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+ Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
final ExecutableFlow flow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
@@ -170,14 +168,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job10", Status.CANCELLED);
assertThreadShutDown();
- eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+ eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
}
@Test
public void exec1FailedKillAll() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
- EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+ Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
final ExecutableFlow flow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
@@ -202,14 +200,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job9", Status.CANCELLED);
assertStatus("job10", Status.CANCELLED);
- eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+ eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
}
@Test
public void exec1FailedFinishRest() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
- EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+ Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
final ExecutableFlow flow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
flow.getExecutionOptions().setFailureAction(
@@ -233,14 +231,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job10", Status.CANCELLED);
assertThreadShutDown();
- eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+ eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
}
@Test
public void execAndCancel() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
- EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+ Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
startThread(this.runner);
@@ -263,14 +261,14 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
waitForAndAssertFlowStatus(Status.KILLED);
- eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
+ eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
}
@Test
public void execRetries() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
- EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
+ Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
startThread(this.runner);
@@ -342,8 +340,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
loader.uploadExecutableFlow(flow);
final FlowRunner runner =
- new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
- this.azkabanEventReporter);
+ new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
runner.addListener(eventCollector);
@@ -364,8 +361,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
loader.uploadExecutableFlow(exFlow);
final FlowRunner runner =
- new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
- this.azkabanEventReporter);
+ new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);
runner.addListener(eventCollector);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 6a25699..e5b9817 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -35,7 +35,6 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
@@ -96,8 +95,6 @@ import org.junit.rules.TemporaryFolder;
public class FlowRunnerTest2 extends FlowRunnerTestBase {
private static int id = 101;
- private final AzkabanEventReporter azkabanEventReporter =
- EventReporterUtil.getTestAzkabanEventReporter();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File workingDir;
@@ -1130,7 +1127,8 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
final FlowRunner runner = new FlowRunner(
this.fakeExecutorLoader.fetchExecutableFlow(exId), this.fakeExecutorLoader,
- mock(ProjectLoader.class), this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
+ mock(ProjectLoader.class), this.jobtypeManager, azkabanProps);
+
runner.addListener(eventCollector);
return runner;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index ea46cef..53cf411 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -19,6 +19,7 @@ package azkaban.execapp;
import static org.assertj.core.api.Assertions.assertThat;
import azkaban.event.Event;
+import azkaban.event.Event.Type;
import azkaban.event.EventData;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
@@ -29,7 +30,6 @@ import azkaban.executor.Status;
import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
-import azkaban.spi.EventType;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
@@ -83,12 +83,12 @@ public class JobRunnerTest {
createJobRunner(1, "testJob", 0, false, loader, eventCollector);
final ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
|| runner.getStatus() != Status.FAILED);
runner.run();
- eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -103,8 +103,7 @@ public class JobRunnerTest {
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
- eventCollector
- .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
+ eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
}
@Ignore
@@ -133,8 +132,7 @@ public class JobRunnerTest {
Assert.assertTrue(!runner.isKilled());
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
- eventCollector
- .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
+ eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
}
@Test
@@ -165,7 +163,7 @@ public class JobRunnerTest {
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
- eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
+ eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
}
@Test
@@ -196,7 +194,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps == null);
Assert.assertTrue(runner.getLogFilePath() == null);
Assert.assertTrue(!runner.isKilled());
- eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
+ eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_FINISHED);
}
@Ignore
@@ -235,8 +233,7 @@ public class JobRunnerTest {
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
Assert.assertTrue(runner.isKilled());
- eventCollector
- .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
+ eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
}
@Ignore
@@ -250,11 +247,11 @@ public class JobRunnerTest {
final long startTime = System.currentTimeMillis();
final ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
runner.run();
- eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -271,8 +268,7 @@ public class JobRunnerTest {
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
Assert.assertTrue(eventCollector.checkOrdering());
- eventCollector
- .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
+ eventCollector.assertEvents(Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED);
}
@Test
@@ -285,7 +281,7 @@ public class JobRunnerTest {
final long startTime = System.currentTimeMillis();
final ExecutableNode node = runner.getNode();
- eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
final Thread thread = new Thread(runner);
@@ -297,7 +293,7 @@ public class JobRunnerTest {
runner.kill();
StatusTestUtils.waitForStatus(node, Status.KILLED);
- eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
@@ -316,7 +312,7 @@ public class JobRunnerTest {
// wait so that there's time to make the "DB update" for KILLED status
azkaban.test.TestUtils.await().untilAsserted(
() -> assertThat(loader.getNodeUpdateCount("testJob")).isEqualTo(2));
- eventCollector.assertEvents(EventType.JOB_FINISHED);
+ eventCollector.assertEvents(Event.Type.JOB_FINISHED);
}
private Props createProps(final int sleepSec, final boolean fail) {
build.gradle 5(+1 -4)
diff --git a/build.gradle b/build.gradle
index a048d5a..a152178 100644
--- a/build.gradle
+++ b/build.gradle
@@ -48,9 +48,6 @@ allprojects {
repositories {
mavenCentral()
mavenLocal()
- maven {
- url "http://packages.confluent.io/maven/"
- }
}
}
@@ -70,7 +67,6 @@ ext.deps = [
dbcp2 : 'org.apache.commons:commons-dbcp2:2.1.1',
dbutils : 'commons-dbutils:commons-dbutils:1.5',
fileupload : 'commons-fileupload:commons-fileupload:1.2.1',
- gobblinKafka : 'com.linkedin.gobblin:gobblin-kafka-08:0.11.0',
gson : 'com.google.code.gson:gson:2.8.1',
guava : 'com.google.guava:guava:21.0',
guice : 'com.google.inject:guice:4.1.0',
@@ -153,6 +149,7 @@ subprojects {
compile deps.log4j
compile deps.guice
compile deps.slf4j
+
runtime deps.slf4jLog4j
testCompile deps.assertj