azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 198(+116 -82)
src/java/azkaban/execapp/JobRunner.java 10(+2 -8)
Details
src/java/azkaban/execapp/FlowRunner.java 198(+116 -82)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 0259777..648acef 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -1,8 +1,24 @@
+/*
+ * Copyright 2013 LinkedIn Corp
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package azkaban.execapp;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -54,7 +70,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private int execId;
private File execDir;
- private ExecutableFlow flow;
+ private final ExecutableFlow flow;
private Thread flowRunnerThread;
private int numJobThreads = 10;
private ExecutionOptions.FailureAction failureAction;
@@ -64,15 +80,14 @@ public class FlowRunner extends EventHandler implements Runnable {
// Properties map
private Map<String, Props> sharedProps = new HashMap<String, Props>();
- private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
+// private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
private Props globalProps;
// private Props commonProps;
private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
- private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
- private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
+ private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
// Used for pipelining
private Integer pipelineLevel = null;
@@ -336,7 +351,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (flowCancelled) {
try {
logger.info("Flow was force cancelled cleaning up.");
- for(JobRunner activeRunner : activeJobRunners.values()) {
+ for(JobRunner activeRunner : activeJobRunners) {
activeRunner.cancel();
}
@@ -352,19 +367,7 @@ public class FlowRunner extends EventHandler implements Runnable {
executorService.shutdown();
synchronized(mainSyncObj) {
- switch(flow.getStatus()) {
- case FAILED_FINISHING:
- logger.info("Setting flow status to Failed.");
- flow.setStatus(Status.FAILED);
- case FAILED:
- case KILLED:
- case FAILED_SUCCEEDED:
- logger.info("Flow is set to " + flow.getStatus().toString());
- break;
- default:
- flow.setStatus(Status.SUCCEEDED);
- logger.info("Flow is set to " + flow.getStatus().toString());
- }
+ finalizeFlow(flow);
}
}
@@ -399,6 +402,24 @@ public class FlowRunner extends EventHandler implements Runnable {
return false;
}
+ private void finalizeFlow(ExecutableFlowBase flow) {
+ String id = flow == this.flow ? "" : flow.getPrintableId() + " ";
+
+ switch(flow.getStatus()) {
+ case FAILED_FINISHING:
+ logger.info("Setting flow " + id + "status to Failed.");
+ flow.setStatus(Status.FAILED);
+ case FAILED:
+ case KILLED:
+ case FAILED_SUCCEEDED:
+ logger.info("Flow " + id + "is set to " + flow.getStatus().toString());
+ break;
+ default:
+ flow.setStatus(Status.SUCCEEDED);
+ logger.info("Flow " + id + "is set to " + flow.getStatus().toString());
+ }
+ }
+
private void prepareJobProperties(ExecutableNode node) throws IOException {
Props props = null;
// The following is the hiearchical ordering of dependency resolution
@@ -476,16 +497,15 @@ public class FlowRunner extends EventHandler implements Runnable {
node.setStatus(Status.RUNNING);
node.setStartTime(System.currentTimeMillis());
- logger.info("Starting subflow " + node.getId() + ".");
+ logger.info("Starting subflow " + node.getPrintableId() + ".");
}
else {
node.setStatus(Status.QUEUED);
JobRunner runner = createJobRunner(node);
- logger.info("Submitting job " + node.getId() + " to run.");
+ logger.info("Submitting job " + node.getPrintableId() + " to run.");
try {
executorService.submit(runner);
- jobRunners.put(node.getId(), runner);
- activeJobRunners.put(node.getId(), runner);
+ activeJobRunners.add(runner);
} catch (RejectedExecutionException e) {
logger.error(e);
};
@@ -543,7 +563,7 @@ public class FlowRunner extends EventHandler implements Runnable {
Props previousOutput = null;
// Iterate the in nodes again and create the dependencies
for (String dependency : node.getInNodes()) {
- Props output = jobOutputProps.get(dependency);
+ Props output = node.getParentFlow().getExecutableNode(dependency).getOutputProps();
if (output != null) {
output = Props.clone(output);
output.setParent(previousOutput);
@@ -635,7 +655,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
- for (JobRunner runner : activeJobRunners.values()) {
+ for (JobRunner runner : activeJobRunners) {
runner.cancel();
}
@@ -649,60 +669,40 @@ public class FlowRunner extends EventHandler implements Runnable {
public void retryFailures(String user) {
synchronized(mainSyncObj) {
logger.info("Retrying failures invoked by " + user);
- ArrayList<String> failures = new ArrayList<String>();
- for (ExecutableNode node: flow.getExecutableNodes()) {
- if (node.getStatus() == Status.FAILED) {
- failures.add(node.getId());
- }
- else if (node.getStatus() == Status.KILLED) {
- node.setStartTime(-1);
- node.setEndTime(-1);
- node.setStatus(Status.READY);
- }
- }
+ retryFailures(flow);
+
+ flow.setStatus(Status.RUNNING);
+ flow.setUpdateTime(System.currentTimeMillis());
+ flowFailed = false;
- retryJobs(failures, user);
+ updateFlow();
+ interrupt();
}
}
- public void retryJobs(List<String> jobIds, String user) {
- synchronized(mainSyncObj) {
- for (String jobId: jobIds) {
- ExecutableNode node = flow.getExecutableNode(jobId);
- if (node == null) {
- logger.error("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot retry.");
- continue;
- }
-
- if (Status.isStatusFinished(node.getStatus())) {
- // Resets the status and increments the attempt number
- node.resetForRetry();
- reEnableDependents(node);
- logger.info("Re-enabling job " + node.getId() + " attempt " + node.getAttempt());
- }
- else {
- logger.error("Cannot retry job " + jobId + " since it hasn't run yet. User " + user);
- continue;
+ private void retryFailures(ExecutableFlowBase flow) {
+ for (ExecutableNode node: flow.getExecutableNodes()) {
+ if (node instanceof ExecutableFlowBase) {
+ if (node.getStatus() == Status.FAILED || node.getStatus() == Status.FAILED_FINISHING || node.getStatus() == Status.KILLED) {
+ retryFailures((ExecutableFlowBase)node);
}
}
- boolean isFailureFound = false;
- for (ExecutableNode node: flow.getExecutableNodes()) {
- Status nodeStatus = node.getStatus();
- if (nodeStatus == Status.FAILED || nodeStatus == Status.KILLED) {
- isFailureFound = true;
- break;
- }
+ if (node.getStatus() == Status.FAILED) {
+ node.resetForRetry();
+ logger.info("Re-enabling job " + node.getPrintableId() + " attempt " + node.getAttempt());
+ reEnableDependents(node);
}
-
- if (!isFailureFound) {
- flow.setStatus(Status.RUNNING);
- flow.setUpdateTime(System.currentTimeMillis());
- flowFailed = false;
+ else if (node.getStatus() == Status.KILLED) {
+ node.setStartTime(-1);
+ node.setEndTime(-1);
+ node.setStatus(Status.READY);
+ }
+ else if (node.getStatus() == Status.FAILED_FINISHING) {
+ node.setStartTime(-1);
+ node.setEndTime(-1);
+ node.setStatus(Status.READY);
}
-
- updateFlow();
- interrupt();
}
}
@@ -743,24 +743,22 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = runner.getNode();
activeJobRunners.remove(node.getId());
- logger.info("Job Finished " + node.getId() + " with status " + node.getStatus());
- if (runner.getOutputProps() != null) {
- logger.info("Job " + node.getId() + " had output props.");
- jobOutputProps.put(node.getId(), runner.getOutputProps());
+ String id = node.getPrintableId(":");
+ logger.info("Job Finished " + id + " with status " + node.getStatus());
+ if (node.getOutputProps() != null) {
+ logger.info("Job " + id + " had output props.");
}
-
- updateFlow();
-
+
if (node.getStatus() == Status.FAILED) {
// Retry failure if conditions are met.
if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
- logger.info("Job " + node.getId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
+ logger.info("Job " + id + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
node.setDelayedExecution(runner.getRetryBackoff());
node.resetForRetry();
}
else {
if (!runner.isCancelled() && runner.getRetries() > 0) {
- logger.info("Job " + node.getId() + " has run out of retry attempts");
+ logger.info("Job " + id + " has run out of retry attempts");
// Setting delayed execution to 0 in case this is manually re-tried.
node.setDelayedExecution(0);
}
@@ -771,7 +769,8 @@ public class FlowRunner extends EventHandler implements Runnable {
// The KILLED status occurs when cancel is invoked. We want to keep this
// status even in failure conditions.
if (flow.getStatus() != Status.KILLED && flow.getStatus() != Status.FAILED) {
- flow.setStatus(Status.FAILED_FINISHING);
+ propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
+
if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
cancel();
@@ -779,15 +778,50 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
}
-
+ finalizeFlowIfFinished(node.getParentFlow());
+ updateFlow();
interrupt();
fireEventListeners(event);
}
}
}
+
+ private void propagateStatus(ExecutableFlowBase base, Status status) {
+ base.setStatus(status);
+ if (base.getParentFlow() != null) {
+ propagateStatus(base.getParentFlow(), status);
+ }
+ }
+
+ private void finalizeFlowIfFinished(ExecutableFlowBase base) {
+ // We let main thread finalize the main flow.
+ if (base == flow) {
+ return;
+ }
+
+ if (base.isFlowFinished()) {
+ Props previousOutput = null;
+ for(String end: base.getEndNodes()) {
+ ExecutableNode node = base.getExecutableNode(end);
+
+ Props output = node.getOutputProps();
+ if (output != null) {
+ output = Props.clone(output);
+ output.setParent(previousOutput);
+ previousOutput = output;
+ }
+ }
+ base.setOutputProps(previousOutput);
+ finalizeFlow(base);
+
+ if (base.getParentFlow() != null) {
+ finalizeFlowIfFinished(base.getParentFlow());
+ }
+ }
+ }
}
-
+
public boolean isCancelled() {
return flowCancelled;
}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index d707c1f..e56be56 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -484,16 +483,6 @@ public class FlowRunnerManager implements EventListener {
runner.retryFailures(user);
}
- public void retryJobs(int execId, String user, List<String> jobId) throws ExecutorManagerException {
- FlowRunner runner = runningFlows.get(execId);
-
- if (runner == null) {
- throw new ExecutorManagerException("Execution " + execId + " is not running.");
- }
-
- runner.retryJobs(jobId, user);
- }
-
public ExecutableFlow getExecutableFlow(int execId) {
FlowRunner runner = runningFlows.get(execId);
if (runner == null) {
src/java/azkaban/execapp/JobRunner.java 10(+2 -8)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index f77f1b2..bc24043 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -54,7 +54,6 @@ public class JobRunner extends EventHandler implements Runnable {
private ExecutorLoader loader;
private Props props;
- private Props outputProps;
private ExecutableNode node;
private File workingDir;
@@ -151,7 +150,7 @@ public class JobRunner extends EventHandler implements Runnable {
// Create file appender
String id = this.jobId;
if (node.getExecutableFlow() != node.getParentFlow()) {
- id = node.getParentFlow().getNestedId("._.");
+ id = node.getPrintableId("._.");
}
String logName = createLogFileName(this.executionId, id, node.getAttempt());
@@ -403,8 +402,7 @@ public class JobRunner extends EventHandler implements Runnable {
node.setStatus(Status.SUCCEEDED);
if (job != null) {
- outputProps = job.getJobGeneratedProperties();
- node.setOutputProps(outputProps);
+ node.setOutputProps(job.getJobGeneratedProperties());
}
}
@@ -445,10 +443,6 @@ public class JobRunner extends EventHandler implements Runnable {
return node.getStatus();
}
- public Props getOutputProps() {
- return outputProps;
- }
-
private void logError(String message) {
if (logger != null) {
logger.error(message);
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 10db20f..34bff5d 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -78,14 +78,6 @@ public class ExecutableFlowBase extends ExecutableNode {
return flowId;
}
- public String getNestedId(String delimiter) {
- if (this.getParentFlow() != null) {
- return this.getParentFlow().getNestedId(delimiter) + delimiter + getId();
- }
-
- return getId();
- }
-
protected void setFlow(Project project, Flow flow) {
this.flowId = flow.getId();
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 6f41c56..e0f6522 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -241,6 +241,17 @@ public class ExecutableNode {
return array;
}
+ public String getPrintableId() {
+ return getPrintableId(":");
+ }
+
+ public String getPrintableId(String delimiter) {
+ if (this.getParentFlow() instanceof ExecutableFlow) {
+ return getId();
+ }
+ return getParentFlow().getPrintableId(delimiter) + delimiter + getId();
+ }
+
public Map<String,Object> toObject() {
Map<String,Object> mapObj = new HashMap<String,Object>();
fillMapFromExecutable(mapObj);
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 9ef6722..d354e65 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -379,7 +379,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
// if the main flow is not the parent, then we'll create a composite key for flowID
if (flow != node.getParentFlow()) {
- flowId = node.getParentFlow().getNestedId("+");
+ flowId = flow.getId() + "+" + node.getParentFlow().getPrintableId("+");
}
QueryRunner runner = createQueryRunner();
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index e0d5e85..4cf171a 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -486,6 +486,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
* @param exFlow
* @throws ServletException
*/
+ @SuppressWarnings("unused")
private void ajaxFetchJobMetaData(HttpServletRequest req,
HttpServletResponse resp, HashMap<String, Object> ret, User user,
ExecutableFlow exFlow) throws ServletException {
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 0698483..8ee41e7 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -74,7 +74,7 @@ public class JobRunnerTest {
Assert.assertTrue( node.getEndTime() - node.getStartTime() > 1000);
File logFile = new File(runner.getLogFilePath());
- Props outputProps = runner.getOutputProps();
+ Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps != null);
Assert.assertTrue(logFile.exists());
@@ -105,7 +105,7 @@ public class JobRunnerTest {
Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
File logFile = new File(runner.getLogFilePath());
- Props outputProps = runner.getOutputProps();
+ Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
@@ -140,7 +140,7 @@ public class JobRunnerTest {
Assert.assertTrue( node.getEndTime() - node.getStartTime() < 10);
// Log file and output files should not exist.
- Props outputProps = runner.getOutputProps();
+ Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(runner.getLogFilePath() == null);
Assert.assertTrue(eventCollector.checkOrdering());
@@ -178,7 +178,7 @@ public class JobRunnerTest {
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
// Log file and output files should not exist.
- Props outputProps = runner.getOutputProps();
+ Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(runner.getLogFilePath() == null);
Assert.assertTrue(!runner.isCancelled());
@@ -227,7 +227,7 @@ public class JobRunnerTest {
// Log file and output files should not exist.
File logFile = new File(runner.getLogFilePath());
- Props outputProps = runner.getOutputProps();
+ Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
@@ -264,7 +264,7 @@ public class JobRunnerTest {
Assert.assertTrue(node.getStartTime() - startTime >= 5000);
File logFile = new File(runner.getLogFilePath());
- Props outputProps = runner.getOutputProps();
+ Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps != null);
Assert.assertTrue(logFile.exists());
Assert.assertFalse(runner.isCancelled());
@@ -321,7 +321,7 @@ public class JobRunnerTest {
Assert.assertTrue(runner.isCancelled());
File logFile = new File(runner.getLogFilePath());
- Props outputProps = runner.getOutputProps();
+ Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
@@ -354,9 +354,10 @@ public class JobRunnerTest {
node.setParentFlow(flow);
Props props = createProps(time, fail);
+ node.setInputProps(props);
HashSet<String> proxyUsers = new HashSet<String>();
proxyUsers.add(flow.getSubmitUser());
- JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager);
+ JobRunner runner = new JobRunner(node, workingDir, loader, jobtypeManager);
runner.setLogSettings(logger, "5MB", 4);
runner.addListener(listener);
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
index 7a2f55d..6b6ed94 100644
--- a/unit/java/azkaban/test/executor/ExecutableFlowTest.java
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -1,10 +1,8 @@
package azkaban.test.executor;
import java.io.File;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -20,16 +18,13 @@ import org.junit.Test;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionAttempt;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.Status;
import azkaban.flow.Flow;
-import azkaban.flow.FlowProps;
import azkaban.project.Project;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.JSONUtils;
-import azkaban.utils.Props;
public class ExecutableFlowTest {
private Project project;
@@ -136,6 +131,7 @@ public class ExecutableFlowTest {
testEquals(exFlow, parsedExFlow);
}
+ @SuppressWarnings("rawtypes")
@Test
public void testExecutorFlowUpdates() throws Exception {
Flow flow = project.getFlow("jobe");