Details
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 066d3a5..4512a6c 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -44,7 +44,9 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
Map<String, LinkedList<InMemoryHistoryNode>> historyListMapping;
private static final String INMEMORY_METRIC_REPORTER_WINDOW = "azkaban.metric.inmemory.interval";
private static final String INMEMORY_METRIC_NUM_INSTANCES = "azkaban.metric.inmemory.maxinstances";
+ private static final String INMEMORY_METRIC_DEVIAION_FACTOR = "azkaban.metric.inmemory.statisticalDeviationFactor";
+ double statisticalDeviationFactor;
/**
* Interval (in millisecond) from today for which we should maintain the in memory snapshots
*/
@@ -61,6 +63,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
historyListMapping = new HashMap<String, LinkedList<InMemoryHistoryNode>>();
interval = azkProps.getLong(INMEMORY_METRIC_REPORTER_WINDOW, 60 * 60 * 24 * 7 * 1000);
numInstances = azkProps.getLong(INMEMORY_METRIC_NUM_INSTANCES, 50);
+ statisticalDeviationFactor = azkProps.getDouble(INMEMORY_METRIC_DEVIAION_FACTOR, 2);
}
/**
@@ -148,7 +151,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
InMemoryHistoryNode currentNode = ite.next();
double value = ((Number) currentNode.getValue()).doubleValue();
// remove all elements which lies in 95% value band
- if (value > mean + 2 * std && value < mean + 2 * std) {
+ if (value > mean + statisticalDeviationFactor * std && value < mean - statisticalDeviationFactor * std) {
ite.remove();
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
index e58023e..db6db1b 100644
--- a/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/TimeBasedReportingMetric.java
@@ -48,8 +48,9 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
TimerTask recurringReporting = new TimerTask() {
@Override
public void run() {
- finalizeValue();
+ preTrackingEventMethod();
notifyManager();
+ postTrackingEventMethod();
}
};
return recurringReporting;
@@ -67,8 +68,13 @@ public abstract class TimeBasedReportingMetric<T> extends AbstractMetric<T> {
}
/**
- * This method is responsible for making a final update to value, if any
+ * This method is responsible for making any last minute update to value, if any
*/
- protected abstract void finalizeValue();
+ protected abstract void preTrackingEventMethod();
+
+ /**
+ * This method is responsible for making any post processing after tracking
+ */
+ protected abstract void postTrackingEventMethod();
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index aa35498..367da00 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -39,6 +39,9 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.execapp.jmx.JmxFlowRunnerManager;
+import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.execapp.metric.NumFailedJobMetric;
+import azkaban.execapp.metric.NumQueuedFlowMetric;
import azkaban.execapp.metric.NumRunningFlowMetric;
import azkaban.execapp.metric.NumRunningJobMetric;
import azkaban.jmx.JmxJettyServer;
@@ -138,15 +141,28 @@ public class AzkabanExecutorServer {
IMetricEmitter metricEmitter = new InMemoryMetricEmitter(props);
metricManager.addMetricEmitter(metricEmitter);
- logger.info("Adding number of Jobs metric");
+ logger.info("Adding number of failed flow metric");
+ metricManager.addMetric(new NumFailedFlowMetric(metricManager, props.getInt("executor.metric.interval."
+ + NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+
+ logger.info("Adding number of failed jobs metric");
+ metricManager.addMetric(new NumFailedJobMetric(metricManager, props.getInt("executor.metric.interval."
+ + NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
+
+ logger.info("Adding number of running Jobs metric");
metricManager.addMetric(new NumRunningJobMetric(metricManager, props.getInt("executor.metric.interval."
+ NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME, props.getInt("executor.metric.interval.default"))));
- logger.info("Adding number of flows metric");
+ logger.info("Adding number of running flows metric");
metricManager.addMetric(new NumRunningFlowMetric(runnerManager, metricManager, props.getInt(
"executor.metric.interval." + NumRunningFlowMetric.NUM_RUNNING_FLOW_METRIC_NAME,
props.getInt("executor.metric.interval.default"))));
+ logger.info("Adding number of queued flows metric");
+ metricManager.addMetric(new NumQueuedFlowMetric(runnerManager, metricManager, props.getInt(
+ "executor.metric.interval." + NumQueuedFlowMetric.NUM_QUEUED_FLOW_METRIC_NAME,
+ props.getInt("executor.metric.interval.default"))));
+
logger.info("Completed configuring Metric Reports");
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index d3611b4..226a68a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -42,6 +42,7 @@ import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.metric.NumFailedJobMetric;
import azkaban.execapp.metric.NumRunningJobMetric;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
@@ -53,7 +54,6 @@ import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.FlowProps;
import azkaban.jobtype.JobTypeManager;
-import azkaban.metric.IMetric;
import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
@@ -61,13 +61,13 @@ import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.SwapQueue;
+
/**
* Class that handles the running of a ExecutableFlow DAG
*
*/
public class FlowRunner extends EventHandler implements Runnable {
- private static final Layout DEFAULT_LAYOUT = new PatternLayout(
- "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
// We check update every 5 minutes, just in case things get stuck. But for the
// most part, we'll be idling.
private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -96,8 +96,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
- private Set<JobRunner> activeJobRunners = Collections
- .newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
+ private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
// Thread safe swap queue for finishedExecutions.
private SwapQueue<ExecutableNode> finishedNodes;
@@ -132,9 +131,8 @@ public class FlowRunner extends EventHandler implements Runnable {
* @param jobtypeManager
* @throws ExecutorManagerException
*/
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
- ProjectLoader projectLoader, JobTypeManager jobtypeManager)
- throws ExecutorManagerException {
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
+ JobTypeManager jobtypeManager) throws ExecutorManagerException {
this(flow, executorLoader, projectLoader, jobtypeManager, null);
}
@@ -149,9 +147,8 @@ public class FlowRunner extends EventHandler implements Runnable {
* @param executorService
* @throws ExecutorManagerException
*/
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader,
- ProjectLoader projectLoader, JobTypeManager jobtypeManager,
- ExecutorService executorService) throws ExecutorManagerException {
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader,
+ JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
@@ -213,23 +210,18 @@ public class FlowRunner extends EventHandler implements Runnable {
runFlow();
} catch (Throwable t) {
if (logger != null) {
- logger
- .error(
- "An error has occurred during the running of the flow. Quiting.",
- t);
+ logger.error("An error has occurred during the running of the flow. Quiting.", t);
}
flow.setStatus(Status.FAILED);
} finally {
if (watcher != null) {
logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
- logger
- .info("Watcher cancelled status is " + watcher.isWatchCancelled());
+ logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
}
flow.setEndTime(System.currentTimeMillis());
- logger.info("Setting end time for flow " + execId + " to "
- + System.currentTimeMillis());
+ logger.info("Setting end time for flow " + execId + " to " + System.currentTimeMillis());
closeLogger();
updateFlow();
@@ -254,8 +246,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
// If there are flow overrides, we apply them now.
- Map<String, String> flowParam =
- flow.getExecutionOptions().getFlowParameters();
+ Map<String, String> flowParam = flow.getExecutionOptions().getFlowParameters();
if (flowParam != null && !flowParam.isEmpty()) {
commonFlowProps = new Props(commonFlowProps, flowParam);
}
@@ -268,11 +259,9 @@ public class FlowRunner extends EventHandler implements Runnable {
this.watcher.setLogger(logger);
}
- logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
- + projectId + " version:" + version);
+ logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
if (pipelineExecId != null) {
- logger.info("Running simulateously with " + pipelineExecId
- + ". Pipelining level " + pipelineLevel);
+ logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
}
// The current thread is used for interrupting blocks
@@ -282,10 +271,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private void updateFlowReference() throws ExecutorManagerException {
logger.info("Update active reference");
- if (!executorLoader.updateExecutableReference(execId,
- System.currentTimeMillis())) {
- throw new ExecutorManagerException(
- "The executor reference doesn't exist. May have been killed prematurely.");
+ if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
+ throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
}
}
@@ -408,8 +395,7 @@ public class FlowRunner extends EventHandler implements Runnable {
resetFailedState(this.flow, retryJobs);
for (ExecutableNode node : retryJobs) {
- if (node.getStatus() == Status.READY
- || node.getStatus() == Status.DISABLED) {
+ if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
runReadyJob(node);
} else if (node.getStatus() == Status.SUCCEEDED) {
for (String outNodeId : node.getOutNodes()) {
@@ -478,8 +464,7 @@ public class FlowRunner extends EventHandler implements Runnable {
// Instant kill or skip if necessary.
boolean jobsRun = false;
for (ExecutableNode node : nodesToCheck) {
- if (Status.isStatusFinished(node.getStatus())
- || Status.isStatusRunning(node.getStatus())) {
+ if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
// Really shouldn't get in here.
continue;
}
@@ -496,8 +481,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
private boolean runReadyJob(ExecutableNode node) throws IOException {
- if (Status.isStatusFinished(node.getStatus())
- || Status.isStatusRunning(node.getStatus())) {
+ if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
return false;
}
@@ -507,8 +491,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
if (nextNodeStatus == Status.CANCELLED) {
- logger.info("Cancelling '" + node.getNestedId()
- + "' due to prior errors.");
+ logger.info("Cancelling '" + node.getNestedId() + "' due to prior errors.");
node.cancelNode(System.currentTimeMillis());
finishExecutableNode(node);
} else if (nextNodeStatus == Status.SKIPPED) {
@@ -540,8 +523,8 @@ public class FlowRunner extends EventHandler implements Runnable {
}
if (node.getRetries() > node.getAttempt()) {
- logger.info("Job '" + node.getId() + "' will be retried. Attempt "
- + node.getAttempt() + " of " + node.getRetries());
+ logger.info("Job '" + node.getId() + "' will be retried. Attempt " + node.getAttempt() + " of "
+ + node.getRetries());
node.setDelayedExecution(node.getRetryBackoff());
node.resetForRetry();
return true;
@@ -582,8 +565,7 @@ public class FlowRunner extends EventHandler implements Runnable {
for (String end : flow.getEndNodes()) {
ExecutableNode node = flow.getExecutableNode(end);
- if (node.getStatus() == Status.KILLED
- || node.getStatus() == Status.FAILED
+ if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED
|| node.getStatus() == Status.CANCELLED) {
succeeded = false;
}
@@ -605,22 +587,19 @@ public class FlowRunner extends EventHandler implements Runnable {
flow.setUpdateTime(System.currentTimeMillis());
long durationSec = (flow.getEndTime() - flow.getStartTime()) / 1000;
switch (flow.getStatus()) {
- case FAILED_FINISHING:
- logger.info("Setting flow '" + id + "' status to FAILED in "
- + durationSec + " seconds");
- flow.setStatus(Status.FAILED);
- break;
- case FAILED:
- case KILLED:
- case CANCELLED:
- case FAILED_SUCCEEDED:
- logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
- + " in " + durationSec + " seconds");
- break;
- default:
- flow.setStatus(Status.SUCCEEDED);
- logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString()
- + " in " + durationSec + " seconds");
+ case FAILED_FINISHING:
+ logger.info("Setting flow '" + id + "' status to FAILED in " + durationSec + " seconds");
+ flow.setStatus(Status.FAILED);
+ break;
+ case FAILED:
+ case KILLED:
+ case CANCELLED:
+ case FAILED_SUCCEEDED:
+ logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
+ break;
+ default:
+ flow.setStatus(Status.SUCCEEDED);
+ logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
}
// If the finalized flow is actually the top level flow, than we finish
@@ -681,13 +660,10 @@ public class FlowRunner extends EventHandler implements Runnable {
// load the override props if any
try {
- props =
- projectLoader.fetchProjectProperty(flow.getProjectId(),
- flow.getVersion(), node.getId() + ".jor");
+ props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId() + ".jor");
} catch (ProjectManagerException e) {
e.printStackTrace();
- logger.error("Error loading job override property for job "
- + node.getId());
+ logger.error("Error loading job override property for job " + node.getId());
}
File path = new File(execDir, source);
@@ -697,8 +673,7 @@ public class FlowRunner extends EventHandler implements Runnable {
props = new Props(null, path);
} catch (IOException e) {
e.printStackTrace();
- logger.error("Error loading job file " + source + " for job "
- + node.getId());
+ logger.error("Error loading job file " + source + " for job " + node.getId());
}
}
// setting this fake source as this will be used to determine the location
@@ -735,8 +710,7 @@ public class FlowRunner extends EventHandler implements Runnable {
public Status getImpliedStatus(ExecutableNode node) {
// If it's running or finished with 'SUCCEEDED', than don't even
// bother starting this job.
- if (Status.isStatusRunning(node.getStatus())
- || node.getStatus() == Status.SUCCEEDED) {
+ if (Status.isStatusRunning(node.getStatus()) || node.getStatus() == Status.SUCCEEDED) {
return null;
}
@@ -751,8 +725,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (!Status.isStatusFinished(depStatus)) {
return null;
- } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED
- || depStatus == Status.KILLED) {
+ } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) {
// We propagate failures as KILLED states.
shouldKill = true;
}
@@ -760,16 +733,14 @@ public class FlowRunner extends EventHandler implements Runnable {
// If it's disabled but ready to run, we want to make sure it continues
// being disabled.
- if (node.getStatus() == Status.DISABLED
- || node.getStatus() == Status.SKIPPED) {
+ if (node.getStatus() == Status.DISABLED || node.getStatus() == Status.SKIPPED) {
return Status.SKIPPED;
}
// If the flow has failed, and we want to finish only the currently running
// jobs, we just
// kill everything else. We also kill, if the flow has been cancelled.
- if (flowFailed
- && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+ if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
return Status.CANCELLED;
} else if (shouldKill || isKilled()) {
return Status.CANCELLED;
@@ -783,8 +754,7 @@ public class FlowRunner extends EventHandler implements Runnable {
Props previousOutput = null;
// Iterate the in nodes again and create the dependencies
for (String dependency : node.getInNodes()) {
- Props output =
- node.getParentFlow().getExecutableNode(dependency).getOutputProps();
+ Props output = node.getParentFlow().getExecutableNode(dependency).getOutputProps();
if (output != null) {
output = Props.clone(output);
output.setParent(previousOutput);
@@ -799,9 +769,7 @@ public class FlowRunner extends EventHandler implements Runnable {
// Load job file.
File path = new File(execDir, node.getJobSource());
- JobRunner jobRunner =
- new JobRunner(node, path.getParentFile(), executorLoader,
- jobtypeManager);
+ JobRunner jobRunner = new JobRunner(node, path.getParentFile(), executorLoader, jobtypeManager);
if (watcher != null) {
jobRunner.setPipeline(watcher, pipelineLevel);
}
@@ -824,10 +792,17 @@ public class FlowRunner extends EventHandler implements Runnable {
*/
private void configureJobLevelMetrics(JobRunner jobRunner) {
logger.info("Configuring Azkaban metrics tracking for jobrunner object");
- if(MetricReportManager.isInstantiated()) {
+ if (MetricReportManager.isInstantiated()) {
MetricReportManager metricManager = MetricReportManager.getInstance();
- NumRunningJobMetric metric = (NumRunningJobMetric) metricManager.getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME);
- jobRunner.addListener(metric);
+
+ //Adding NumRunningJobMetric listener
+ jobRunner.addListener((NumRunningJobMetric) metricManager
+ .getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME));
+
+ //Adding NumFailedJobMetric listener
+ jobRunner.addListener((NumFailedJobMetric) metricManager
+ .getMetricFromName(NumFailedJobMetric.NUM_FAILED_JOB_METRIC_NAME));
+
}
}
@@ -890,8 +865,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (watcher != null) {
logger.info("Watcher is attached. Stopping watcher.");
watcher.stopWatcher();
- logger
- .info("Watcher cancelled status is " + watcher.isWatchCancelled());
+ logger.info("Watcher cancelled status is " + watcher.isWatchCancelled());
}
logger.info("Killing " + activeJobRunners.size() + " jobs.");
@@ -909,8 +883,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- private void resetFailedState(ExecutableFlowBase flow,
- List<ExecutableNode> nodesToRetry) {
+ private void resetFailedState(ExecutableFlowBase flow, List<ExecutableNode> nodesToRetry) {
// bottom up
LinkedList<ExecutableNode> queue = new LinkedList<ExecutableNode>();
for (String id : flow.getEndNodes()) {
@@ -939,24 +912,24 @@ public class FlowRunner extends EventHandler implements Runnable {
} else if (node instanceof ExecutableFlowBase) {
ExecutableFlowBase base = (ExecutableFlowBase) node;
switch (base.getStatus()) {
- case CANCELLED:
- node.setStatus(Status.READY);
- node.setEndTime(-1);
- node.setStartTime(-1);
- node.setUpdateTime(currentTime);
- // Break out of the switch. We'll reset the flow just like a normal
- // node
- break;
- case KILLED:
- case FAILED:
- case FAILED_FINISHING:
- resetFailedState(base, nodesToRetry);
- continue;
- default:
- // Continue the while loop. If the job is in a finished state that's
- // not
- // a failure, we don't want to reset the job.
- continue;
+ case CANCELLED:
+ node.setStatus(Status.READY);
+ node.setEndTime(-1);
+ node.setStartTime(-1);
+ node.setUpdateTime(currentTime);
+ // Break out of the switch. We'll reset the flow just like a normal
+ // node
+ break;
+ case KILLED:
+ case FAILED:
+ case FAILED_FINISHING:
+ resetFailedState(base, nodesToRetry);
+ continue;
+ default:
+ // Continue the while loop. If the job is in a finished state that's
+ // not
+ // a failure, we don't want to reset the job.
+ continue;
}
} else if (node.getStatus() == Status.CANCELLED) {
// Not a flow, but killed
@@ -964,16 +937,13 @@ public class FlowRunner extends EventHandler implements Runnable {
node.setStartTime(-1);
node.setEndTime(-1);
node.setUpdateTime(currentTime);
- } else if (node.getStatus() == Status.FAILED
- || node.getStatus() == Status.KILLED) {
+ } else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
node.resetForRetry();
nodesToRetry.add(node);
}
- if (!(node instanceof ExecutableFlowBase)
- && node.getStatus() != oldStatus) {
- logger.info("Resetting job '" + node.getNestedId() + "' from "
- + oldStatus + " to " + node.getStatus());
+ if (!(node instanceof ExecutableFlowBase) && node.getStatus() != oldStatus) {
+ logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
}
for (String inId : node.getInNodes()) {
@@ -995,16 +965,14 @@ public class FlowRunner extends EventHandler implements Runnable {
// start node has not.
for (String id : flow.getStartNodes()) {
ExecutableNode node = flow.getExecutableNode(id);
- if (node.getStatus() == Status.READY
- || node.getStatus() == Status.DISABLED) {
+ if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
nodesToRetry.add(node);
}
}
}
flow.setUpdateTime(System.currentTimeMillis());
flow.setEndTime(-1);
- logger.info("Resetting flow '" + flow.getNestedId() + "' from "
- + oldFlowState + " to " + flow.getStatus());
+ logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
}
private void interrupt() {
@@ -1025,14 +993,13 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = runner.getNode();
long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
synchronized (mainSyncObj) {
- logger.info("Job " + node.getNestedId() + " finished with status "
- + node.getStatus() + " in " + seconds + " seconds");
+ logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds
+ + " seconds");
// Cancellation is handled in the main thread, but if the flow is
// paused, the main thread is paused too.
// This unpauses the flow for cancellation.
- if (flowPaused && node.getStatus() == Status.FAILED
- && failureAction == FailureAction.CANCEL_ALL) {
+ if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
flowPaused = false;
}
@@ -1075,8 +1042,7 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = flow.getExecutableNodePath(jobId);
File path = new File(execDir, node.getJobSource());
- String attachmentFileName =
- JobRunner.createAttachmentFileName(node, attempt);
+ String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
File attachmentFile = new File(path.getParentFile(), attachmentFileName);
if (!attachmentFile.exists()) {
return null;
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 54d82e8..9aa5420 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -42,12 +42,15 @@ import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
+import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.execapp.metric.NumFailedJobMetric;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
+import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
@@ -60,26 +63,26 @@ import azkaban.utils.TrackingThreadPool;
/**
* Execution manager for the server side execution.
- *
+ *
* When a flow is submitted to FlowRunnerManager, it is the
* {@link Status.PREPARING} status. When a flow is about to be executed by
* FlowRunner, its status is updated to {@link Status.RUNNING}
- *
+ *
* Two main data structures are used in this class to maintain flows.
- *
+ *
* runningFlows: this is used as a bookkeeping for submitted flows in
* FlowRunnerManager. It has nothing to do with the executor service that is
* used to execute the flows. This bookkeeping is used at the time of canceling
* or killing a flow. The flows in this data structure is removed in the
* handleEvent method.
- *
+ *
* submittedFlows: this is used to keep track the execution of the flows, so it
* has the mapping between a Future<?> and an execution id. This would allow us
* to find out the execution ids of the flows that are in the Status.PREPARING
* status. The entries in this map is removed once the flow execution is
* completed.
- *
- *
+ *
+ *
*/
public class FlowRunnerManager implements EventListener,
ThreadPoolExecutingListener {
@@ -483,6 +486,8 @@ public class FlowRunnerManager implements EventListener,
.setValidateProxyUser(validateProxyUser)
.setNumJobThreads(numJobThreads).addListener(this);
+ configureFlowLevelMetrics(runner);
+
// Check again.
if (runningFlows.containsKey(execId)) {
throw new ExecutorManagerException("Execution " + execId
@@ -508,6 +513,21 @@ public class FlowRunnerManager implements EventListener,
}
}
+ /**
+ * Configure Azkaban metrics tracking for a new flowRunner instance
+ * @param flowRunner
+ */
+ private void configureFlowLevelMetrics(FlowRunner flowRunner) {
+ logger.info("Configuring Azkaban metrics tracking for flow runner object");
+
+ if (MetricReportManager.isInstantiated()) {
+ MetricReportManager metricManager = MetricReportManager.getInstance();
+ //Adding NumFailedFlow Metric listener
+ flowRunner.addListener((NumFailedFlowMetric) metricManager
+ .getMetricFromName(NumFailedFlowMetric.NUM_FAILED_FLOW_METRIC_NAME));
+ }
+ }
+
private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
int execId = flow.getExecutionId();
File execPath = new File(executionDirectory, String.valueOf(execId));
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
new file mode 100644
index 0000000..99f6b03
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedFlowMetric.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.Status;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+
+public class NumFailedFlowMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+ public static final String NUM_FAILED_FLOW_METRIC_NAME = "NumFailedFlowMetric";
+ private static final String NUM_FAILED_FLOW_METRIC_TYPE = "uint16";
+
+ public NumFailedFlowMetric(MetricReportManager manager, long interval) {
+ super(NUM_FAILED_FLOW_METRIC_NAME, NUM_FAILED_FLOW_METRIC_TYPE, 0, manager, interval);
+ logger.debug("Instantiated NumFailedJobMetric");
+ }
+
+ /**
+ * Listen for events to maintain correct value of number of failed flows
+ * {@inheritDoc}
+ * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+ */
+ @Override
+ public synchronized void handleEvent(Event event) {
+ if (event.getType() == Type.FLOW_FINISHED) {
+ FlowRunner runner = (FlowRunner) event.getRunner();
+ if (runner != null && runner.getExecutableFlow().getStatus().equals(Status.FAILED)) {
+ value = value + 1;
+ }
+ }
+ }
+
+ @Override
+ protected void preTrackingEventMethod() {
+ // Nothing to finalize before tracking event
+ }
+
+ @Override
+ protected synchronized void postTrackingEventMethod() {
+ value = 0;
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
new file mode 100644
index 0000000..6e16899
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumFailedJobMetric.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.execapp.JobRunner;
+import azkaban.executor.Status;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+/**
+ * Metric to keep track of number of failed jobs in between the tracking events
+ */
+public class NumFailedJobMetric extends TimeBasedReportingMetric<Integer> implements EventListener {
+ public static final String NUM_FAILED_JOB_METRIC_NAME = "NumFailedJobMetric";
+ private static final String NUM_FAILED_JOB_METRIC_TYPE = "uint16";
+
+ public NumFailedJobMetric(MetricReportManager manager, long interval) {
+ super(NUM_FAILED_JOB_METRIC_NAME, NUM_FAILED_JOB_METRIC_TYPE, 0, manager, interval);
+ logger.debug("Instantiated NumFailedJobMetric");
+ }
+
+ /**
+ * Listen for events to maintain correct value of number of failed jobs
+ * {@inheritDoc}
+ * @see azkaban.event.EventListener#handleEvent(azkaban.event.Event)
+ */
+ @Override
+ public synchronized void handleEvent(Event event) {
+ JobRunner runner = (JobRunner) event.getRunner();
+ if (event.getType() == Type.JOB_FINISHED && runner.getStatus().equals(Status.FAILED)) {
+ value = value + 1;
+ }
+ }
+
+ @Override
+ protected void preTrackingEventMethod() {
+ // Nothing to finalize before tracking event
+ }
+
+ @Override
+ protected synchronized void postTrackingEventMethod() {
+ value = 0;
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
new file mode 100644
index 0000000..5036078
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumQueuedFlowMetric.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.MetricReportManager;
+import azkaban.metric.TimeBasedReportingMetric;
+
+public class NumQueuedFlowMetric extends TimeBasedReportingMetric<Integer> {
+ public static final String NUM_QUEUED_FLOW_METRIC_NAME = "NumRunningFlowMetric";
+ private static final String NUM_QUEUED_FLOW_METRIC_TYPE = "uint16";
+
+ private FlowRunnerManager flowManager;
+
+ /**
+ * @param flowRunnerManager Flow runner manager
+ * @param manager metric report manager
+ * @param interval reporting interval
+ */
+ public NumQueuedFlowMetric(FlowRunnerManager flowRunnerManager, MetricReportManager manager, long interval) {
+ super(NUM_QUEUED_FLOW_METRIC_NAME, NUM_QUEUED_FLOW_METRIC_TYPE, 0, manager, interval);
+ logger.debug("Instantiated NumQueuedFlowMetric");
+ flowManager = flowRunnerManager;
+ }
+
+ /**
+ * Update value using flow manager
+ * {@inheritDoc}
+ * @see azkaban.metric.TimeBasedReportingMetric#preTrackingEventMethod()
+ */
+ @Override
+ protected synchronized void preTrackingEventMethod() {
+ value = flowManager.getNumQueuedFlows();
+ }
+
+ @Override
+ protected void postTrackingEventMethod() {
+ // nothing to post process
+ }
+
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
index 2ac8654..b611151 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -43,11 +43,16 @@ public class NumRunningFlowMetric extends TimeBasedReportingMetric<Integer> {
/**
* Update value using flow manager
* {@inheritDoc}
- * @see azkaban.metric.TimeBasedReportingMetric#finalizeValue()
+ * @see azkaban.metric.TimeBasedReportingMetric#preTrackingEventMethod()
*/
@Override
- protected synchronized void finalizeValue() {
+ protected synchronized void preTrackingEventMethod() {
value = flowManager.getNumRunningFlows();
}
+ @Override
+ protected void postTrackingEventMethod() {
+ // nothing to post process
+ }
+
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
index 58146bc..1ad39d1 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -53,8 +53,13 @@ public class NumRunningJobMetric extends TimeBasedReportingMetric<Integer> imple
}
@Override
- protected synchronized void finalizeValue() {
+ protected synchronized void preTrackingEventMethod() {
// nothing to finalize value is already updated
}
+ @Override
+ protected void postTrackingEventMethod() {
+ // nothing to post process
+ }
+
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index e204088..a06b859 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -248,7 +248,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
}
private Date parseDate(String date) throws ParseException {
- DateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm a");
+ DateFormat format = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz");
return format.parse(date);
}
}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index ffecbcc..636d989 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -36,8 +36,8 @@
var requestURL = '/stats';
var requestData = {
'action': 'metricHistory',
- 'from': $('#datetimebegin').val(),
- 'to' : $('#datetimeend').val(),
+ 'from': new Date($('#datetimebegin').val()).toUTCString(),
+ 'to' : new Date($('#datetimeend').val()).toUTCString(),
'metricName': $('#metricName').val(),
'useStats': $("#useStats").is(':checked')
};