Details
diff --git a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
index f510eb3..9aaad84 100644
--- a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
+++ b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
@@ -16,9 +16,14 @@
package azkaban.flow;
+import static java.util.Objects.requireNonNull;
+
+import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.Status;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
import azkaban.utils.Props;
import java.util.List;
import java.util.Map;
@@ -87,4 +92,28 @@ public class FlowUtils {
}
}
}
+
+ public static Project getProject(final ProjectManager projectManager, final int projectId) {
+ final Project project = projectManager.getProject(projectId);
+ if (project == null) {
+ throw new RuntimeException("Error finding the project to execute "
+ + projectId);
+ }
+ return project;
+ }
+
+ public static Flow getFlow(final Project project, final String flowName) {
+ final Project nonNullProj = requireNonNull(project);
+ final Flow flow = nonNullProj.getFlow(flowName);
+ if (flow == null) {
+ throw new RuntimeException("Error finding the flow to execute " + flowName);
+ }
+ return flow;
+ }
+
+ public static ExecutableFlow createExecutableFlow(final Project project, final Flow flow) {
+ final ExecutableFlow exflow = new ExecutableFlow(project, flow);
+ exflow.addAllProxyUsers(project.getProxyUsers());
+ return exflow;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index fff11e6..1f750fc 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -21,6 +21,7 @@ import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
+import azkaban.flow.FlowUtils;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.sla.SlaOption;
@@ -201,24 +202,12 @@ public class ExecuteFlowAction implements TriggerAction {
throw new Exception("ExecuteFlowAction not properly initialized!");
}
- final Project project = projectManager.getProject(this.projectId);
- if (project == null) {
- logger.error("Project to execute " + this.projectId + " does not exist!");
- throw new RuntimeException("Error finding the project to execute "
- + this.projectId);
- }
+ final Project project = FlowUtils.getProject(projectManager, this.projectId);
+ final Flow flow = FlowUtils.getFlow(project, this.flowName);
- final Flow flow = project.getFlow(this.flowName);
- if (flow == null) {
- logger.error("Flow " + this.flowName + " cannot be found in project "
- + project.getName());
- throw new RuntimeException("Error finding the flow to execute "
- + this.flowName);
- }
+ final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);
- final ExecutableFlow exflow = new ExecutableFlow(project, flow);
exflow.setSubmitUser(this.submitUser);
- exflow.addAllProxyUsers(project.getProxyUsers());
if (this.executionOptions == null) {
this.executionOptions = new ExecutionOptions();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
index 80a368a..fd5cc25 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -34,7 +34,7 @@ public class ExecutableFlowPriorityComparatorTest {
private ExecutableFlow createExecutableFlow(final String flowName, final int priority,
final long updateTime, final int executionId) throws IOException {
final ExecutableFlow execFlow =
- TestUtils.createExecutableFlow("exectest1", flowName);
+ TestUtils.createTestExecutableFlow("exectest1", flowName);
execFlow.setUpdateTime(updateTime);
execFlow.setExecutionId(executionId);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index ea4cc79..65942e9 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -86,7 +86,7 @@ public class ExecutionFlowDaoTest {
}
private ExecutableFlow createTestFlow() throws Exception {
- return TestUtils.createExecutableFlow("exectest1", "exec1");
+ return TestUtils.createTestExecutableFlow("exectest1", "exec1");
}
@Test
@@ -178,7 +178,7 @@ public class ExecutionFlowDaoTest {
final ExecutableFlow flow = createTestFlow();
flow.setStatus(Status.PREPARING);
this.executionFlowDao.uploadExecutableFlow(flow);
- final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
flow2.setStatus(Status.PREPARING);
this.executionFlowDao.uploadExecutableFlow(flow2);
@@ -197,7 +197,7 @@ public class ExecutionFlowDaoTest {
final String host = "localhost";
final int port = 12345;
final Executor executor = this.executorDao.addExecutor(host, port);
- final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
this.executionFlowDao.uploadExecutableFlow(flow);
this.assignExecutor.assignExecutor(executor.getId(), flow.getExecutionId());
@@ -212,7 +212,7 @@ public class ExecutionFlowDaoTest {
/* Test exception when assigning a non-existent executor to a flow */
@Test
public void testAssignExecutorInvalidExecutor() throws Exception {
- final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
this.executionFlowDao.uploadExecutableFlow(flow);
// Since we haven't inserted any executors, 1 should be non-existent executor id.
@@ -241,13 +241,13 @@ public class ExecutionFlowDaoTest {
public void testFetchActiveFlowsExecutorAssigned() throws Exception {
// Upload flow1, executor assigned
- final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
this.executionFlowDao.uploadExecutableFlow(flow1);
final Executor executor = this.executorDao.addExecutor("test", 1);
this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
// Upload flow2, executor not assigned
- final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
this.executionFlowDao.uploadExecutableFlow(flow2);
final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
@@ -262,7 +262,7 @@ public class ExecutionFlowDaoTest {
@Test
public void testFetchActiveFlowsStatusChanged() throws Exception {
- final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
this.executionFlowDao.uploadExecutableFlow(flow1);
final Executor executor = this.executorDao.addExecutor("test", 1);
this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
@@ -306,7 +306,7 @@ public class ExecutionFlowDaoTest {
@Test
public void testUploadAndFetchExecutableNode() throws Exception {
- final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
flow.setExecutionId(10);
final File jobFile = ExecutionsTestUtil.getFlowFile("exectest1", "job10.job");
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 086c76f..3355936 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -207,9 +207,9 @@ public class ExecutorManagerTest {
@Test
public void testQueuedFlows() throws Exception {
final ExecutorManager manager = createMultiExecutorManagerInstance();
- final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
flow1.setExecutionId(1);
- final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
flow2.setExecutionId(2);
final User testUser = TestUtils.getTestUser();
@@ -241,7 +241,7 @@ public class ExecutorManagerTest {
@Test(expected = ExecutorManagerException.class)
public void testDuplicateQueuedFlows() throws Exception {
final ExecutorManager manager = createMultiExecutorManagerInstance();
- final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
flow1.getExecutionOptions().setConcurrentOption(
ExecutionOptions.CONCURRENT_OPTION_SKIP);
@@ -257,7 +257,7 @@ public class ExecutorManagerTest {
@Test
public void testKillQueuedFlow() throws Exception {
final ExecutorManager manager = createMultiExecutorManagerInstance();
- final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
final User testUser = TestUtils.getTestUser();
manager.submitExecutableFlow(flow1, testUser.getUserId());
@@ -273,7 +273,7 @@ public class ExecutorManagerTest {
@Test
public void testNotFoundFlows() throws Exception {
testSetUpForRunningFlows();
- final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
mockUpdateResponse(ImmutableMap.of(ConnectorParams.RESPONSE_UPDATED_FLOWS,
@@ -302,7 +302,7 @@ public class ExecutorManagerTest {
@Test
public void testSubmitFlows() throws Exception {
testSetUpForRunningFlows();
- final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
this.manager.submitExecutableFlow(flow1, this.user.getUserId());
verify(this.loader).uploadExecutableFlow(flow1);
verify(this.loader).addActiveExecutableReference(any());
@@ -374,8 +374,8 @@ public class ExecutorManagerTest {
when(this.loader.fetchActiveExecutors()).thenReturn(executors);
this.manager = createExecutorManager();
- this.flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
- this.flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ this.flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ this.flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
this.flow1.setExecutionId(1);
this.flow2.setExecutionId(2);
final ExecutionReference ref1 =
diff --git a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
index f00a807..181494a 100644
--- a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
@@ -66,15 +66,15 @@ public class NumExecutionsDaoTest {
@Test
public void testFetchNumExecutableFlows() throws Exception {
- final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
flow1.setStatus(Status.PREPARING);
this.executionFlowDao.uploadExecutableFlow(flow1);
- final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
flow2.setStatus(Status.RUNNING);
this.executionFlowDao.uploadExecutableFlow(flow2);
- final ExecutableFlow flow2b = TestUtils.createExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow2b = TestUtils.createTestExecutableFlow("exectest1", "exec2");
flow2b.setStatus(Status.FAILED);
this.executionFlowDao.uploadExecutableFlow(flow2b);
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
index f30a4fc..d94469e 100644
--- a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -32,7 +32,7 @@ public class QueuedExecutionsTest {
*/
private Pair<ExecutionReference, ExecutableFlow> createExecutablePair(
final String flowName, final int execId) throws IOException {
- final ExecutableFlow execFlow = TestUtils.createExecutableFlow("exectest1", flowName);
+ final ExecutableFlow execFlow = TestUtils.createTestExecutableFlow("exectest1", flowName);
execFlow.setExecutionId(execId);
final ExecutionReference ref = new ExecutionReference(execId);
return new Pair<>(ref, execFlow);
diff --git a/azkaban-common/src/test/java/azkaban/flow/FlowUtilsTest.java b/azkaban-common/src/test/java/azkaban/flow/FlowUtilsTest.java
new file mode 100644
index 0000000..54a141a
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/flow/FlowUtilsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.flow;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.project.DirectoryFlowLoader;
+import azkaban.project.Project;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class FlowUtilsTest {
+
+
+ private Project project;
+ private Props props;
+
+
+ @Before
+ public void setUp() throws Exception {
+ this.project = new Project(11, "myTestProject");
+ this.props = new Props();
+ final DirectoryFlowLoader loader = new DirectoryFlowLoader(this.props);
+ loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
+ Assert.assertEquals(0, loader.getErrors().size());
+ this.project.setFlows(loader.getFlowMap());
+ this.project.setVersion(123);
+ }
+
+
+ @Test
+ public void testGetFlow() throws Exception {
+ final Flow flow = FlowUtils.getFlow(this.project, "jobe");
+ Assert.assertEquals(flow.getId(), "jobe");
+
+ assertThatThrownBy(() -> FlowUtils.getFlow(this.project, "nonexisting"))
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ public void testCreateExecutableFlow() throws Exception {
+ final Flow flow = FlowUtils.getFlow(this.project, "jobe");
+ final ExecutableFlow exFlow = FlowUtils.createExecutableFlow(this.project, flow);
+ Assert.assertEquals(exFlow.getProjectId(), this.project.getId());
+ Assert.assertEquals(exFlow.getFlowId(), flow.getId());
+ Assert.assertEquals(exFlow.getProxyUsers(), this.project.getProxyUsers());
+ }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
index b92bfdd..6f12b99 100644
--- a/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
@@ -37,7 +37,7 @@ public final class HttpRequestUtilsTest {
/* Helper method to get a test flow and add required properties */
public static ExecutableFlow createExecutableFlow() throws IOException {
- final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
flow.getExecutionOptions().getFlowParameters()
.put(ExecutionOptions.FLOW_PRIORITY, "1");
flow.getExecutionOptions().getFlowParameters()
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index a16e5f6..35bb985 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -37,7 +37,7 @@ public class TestUtils {
}
/* Helper method to create an ExecutableFlow from serialized description */
- public static ExecutableFlow createExecutableFlow(final String projectName,
+ public static ExecutableFlow createTestExecutableFlow(final String projectName,
final String flowName) throws IOException {
final File jsonFlowFile = ExecutionsTestUtil.getFlowFile(projectName, flowName + ".flow");
final HashMap<String, Object> flowObj =
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 2c174c5..dd41056 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -924,9 +924,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- final ExecutableFlow exflow = new ExecutableFlow(project, flow);
+ final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);
exflow.setSubmitUser(user.getUserId());
- exflow.addAllProxyUsers(project.getProxyUsers());
final ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);
exflow.setExecutionOptions(options);