azkaban-aplcache

make ExecutableFlow initialization methods common util methods

10/26/2017 6:54:08 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
index f510eb3..9aaad84 100644
--- a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
+++ b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
@@ -16,9 +16,14 @@
 
 package azkaban.flow;
 
+import static java.util.Objects.requireNonNull;
+
+import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.Status;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
 import azkaban.utils.Props;
 import java.util.List;
 import java.util.Map;
@@ -87,4 +92,28 @@ public class FlowUtils {
       }
     }
   }
+
+  public static Project getProject(final ProjectManager projectManager, final int projectId) {
+    final Project project = projectManager.getProject(projectId);
+    if (project == null) {
+      throw new RuntimeException("Error finding the project to execute "
+          + projectId);
+    }
+    return project;
+  }
+
+  public static Flow getFlow(final Project project, final String flowName) {
+    final Project nonNullProj = requireNonNull(project);
+    final Flow flow = nonNullProj.getFlow(flowName);
+    if (flow == null) {
+      throw new RuntimeException("Error finding the flow to execute " + flowName);
+    }
+    return flow;
+  }
+
+  public static ExecutableFlow createExecutableFlow(final Project project, final Flow flow) {
+    final ExecutableFlow exflow = new ExecutableFlow(project, flow);
+    exflow.addAllProxyUsers(project.getProxyUsers());
+    return exflow;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index fff11e6..1f750fc 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -21,6 +21,7 @@ import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
+import azkaban.flow.FlowUtils;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 import azkaban.sla.SlaOption;
@@ -201,24 +202,12 @@ public class ExecuteFlowAction implements TriggerAction {
       throw new Exception("ExecuteFlowAction not properly initialized!");
     }
 
-    final Project project = projectManager.getProject(this.projectId);
-    if (project == null) {
-      logger.error("Project to execute " + this.projectId + " does not exist!");
-      throw new RuntimeException("Error finding the project to execute "
-          + this.projectId);
-    }
+    final Project project = FlowUtils.getProject(projectManager, this.projectId);
+    final Flow flow = FlowUtils.getFlow(project, this.flowName);
 
-    final Flow flow = project.getFlow(this.flowName);
-    if (flow == null) {
-      logger.error("Flow " + this.flowName + " cannot be found in project "
-          + project.getName());
-      throw new RuntimeException("Error finding the flow to execute "
-          + this.flowName);
-    }
+    final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);
 
-    final ExecutableFlow exflow = new ExecutableFlow(project, flow);
     exflow.setSubmitUser(this.submitUser);
-    exflow.addAllProxyUsers(project.getProxyUsers());
 
     if (this.executionOptions == null) {
       this.executionOptions = new ExecutionOptions();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
index 80a368a..fd5cc25 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -34,7 +34,7 @@ public class ExecutableFlowPriorityComparatorTest {
   private ExecutableFlow createExecutableFlow(final String flowName, final int priority,
       final long updateTime, final int executionId) throws IOException {
     final ExecutableFlow execFlow =
-        TestUtils.createExecutableFlow("exectest1", flowName);
+        TestUtils.createTestExecutableFlow("exectest1", flowName);
 
     execFlow.setUpdateTime(updateTime);
     execFlow.setExecutionId(executionId);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index ea4cc79..65942e9 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -86,7 +86,7 @@ public class ExecutionFlowDaoTest {
   }
 
   private ExecutableFlow createTestFlow() throws Exception {
-    return TestUtils.createExecutableFlow("exectest1", "exec1");
+    return TestUtils.createTestExecutableFlow("exectest1", "exec1");
   }
 
   @Test
@@ -178,7 +178,7 @@ public class ExecutionFlowDaoTest {
     final ExecutableFlow flow = createTestFlow();
     flow.setStatus(Status.PREPARING);
     this.executionFlowDao.uploadExecutableFlow(flow);
-    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
     flow2.setStatus(Status.PREPARING);
     this.executionFlowDao.uploadExecutableFlow(flow2);
 
@@ -197,7 +197,7 @@ public class ExecutionFlowDaoTest {
     final String host = "localhost";
     final int port = 12345;
     final Executor executor = this.executorDao.addExecutor(host, port);
-    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     this.executionFlowDao.uploadExecutableFlow(flow);
     this.assignExecutor.assignExecutor(executor.getId(), flow.getExecutionId());
 
@@ -212,7 +212,7 @@ public class ExecutionFlowDaoTest {
   /* Test exception when assigning a non-existent executor to a flow */
   @Test
   public void testAssignExecutorInvalidExecutor() throws Exception {
-    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     this.executionFlowDao.uploadExecutableFlow(flow);
 
     // Since we haven't inserted any executors, 1 should be non-existent executor id.
@@ -241,13 +241,13 @@ public class ExecutionFlowDaoTest {
   public void testFetchActiveFlowsExecutorAssigned() throws Exception {
 
     // Upload flow1, executor assigned
-    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     this.executionFlowDao.uploadExecutableFlow(flow1);
     final Executor executor = this.executorDao.addExecutor("test", 1);
     this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
 
     // Upload flow2, executor not assigned
-    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
     this.executionFlowDao.uploadExecutableFlow(flow2);
 
     final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
@@ -262,7 +262,7 @@ public class ExecutionFlowDaoTest {
 
   @Test
   public void testFetchActiveFlowsStatusChanged() throws Exception {
-    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     this.executionFlowDao.uploadExecutableFlow(flow1);
     final Executor executor = this.executorDao.addExecutor("test", 1);
     this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
@@ -306,7 +306,7 @@ public class ExecutionFlowDaoTest {
   @Test
   public void testUploadAndFetchExecutableNode() throws Exception {
 
-    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     flow.setExecutionId(10);
 
     final File jobFile = ExecutionsTestUtil.getFlowFile("exectest1", "job10.job");
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 086c76f..3355936 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -207,9 +207,9 @@ public class ExecutorManagerTest {
   @Test
   public void testQueuedFlows() throws Exception {
     final ExecutorManager manager = createMultiExecutorManagerInstance();
-    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     flow1.setExecutionId(1);
-    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
     flow2.setExecutionId(2);
 
     final User testUser = TestUtils.getTestUser();
@@ -241,7 +241,7 @@ public class ExecutorManagerTest {
   @Test(expected = ExecutorManagerException.class)
   public void testDuplicateQueuedFlows() throws Exception {
     final ExecutorManager manager = createMultiExecutorManagerInstance();
-    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     flow1.getExecutionOptions().setConcurrentOption(
         ExecutionOptions.CONCURRENT_OPTION_SKIP);
 
@@ -257,7 +257,7 @@ public class ExecutorManagerTest {
   @Test
   public void testKillQueuedFlow() throws Exception {
     final ExecutorManager manager = createMultiExecutorManagerInstance();
-    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     final User testUser = TestUtils.getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
 
@@ -273,7 +273,7 @@ public class ExecutorManagerTest {
   @Test
   public void testNotFoundFlows() throws Exception {
     testSetUpForRunningFlows();
-    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
 
     mockUpdateResponse(ImmutableMap.of(ConnectorParams.RESPONSE_UPDATED_FLOWS,
@@ -302,7 +302,7 @@ public class ExecutorManagerTest {
   @Test
   public void testSubmitFlows() throws Exception {
     testSetUpForRunningFlows();
-    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     this.manager.submitExecutableFlow(flow1, this.user.getUserId());
     verify(this.loader).uploadExecutableFlow(flow1);
     verify(this.loader).addActiveExecutableReference(any());
@@ -374,8 +374,8 @@ public class ExecutorManagerTest {
     when(this.loader.fetchActiveExecutors()).thenReturn(executors);
     this.manager = createExecutorManager();
 
-    this.flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
-    this.flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    this.flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    this.flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
     this.flow1.setExecutionId(1);
     this.flow2.setExecutionId(2);
     final ExecutionReference ref1 =
diff --git a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
index f00a807..181494a 100644
--- a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
@@ -66,15 +66,15 @@ public class NumExecutionsDaoTest {
 
   @Test
   public void testFetchNumExecutableFlows() throws Exception {
-    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     flow1.setStatus(Status.PREPARING);
     this.executionFlowDao.uploadExecutableFlow(flow1);
 
-    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
     flow2.setStatus(Status.RUNNING);
     this.executionFlowDao.uploadExecutableFlow(flow2);
 
-    final ExecutableFlow flow2b = TestUtils.createExecutableFlow("exectest1", "exec2");
+    final ExecutableFlow flow2b = TestUtils.createTestExecutableFlow("exectest1", "exec2");
     flow2b.setStatus(Status.FAILED);
     this.executionFlowDao.uploadExecutableFlow(flow2b);
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
index f30a4fc..d94469e 100644
--- a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -32,7 +32,7 @@ public class QueuedExecutionsTest {
    */
   private Pair<ExecutionReference, ExecutableFlow> createExecutablePair(
       final String flowName, final int execId) throws IOException {
-    final ExecutableFlow execFlow = TestUtils.createExecutableFlow("exectest1", flowName);
+    final ExecutableFlow execFlow = TestUtils.createTestExecutableFlow("exectest1", flowName);
     execFlow.setExecutionId(execId);
     final ExecutionReference ref = new ExecutionReference(execId);
     return new Pair<>(ref, execFlow);
diff --git a/azkaban-common/src/test/java/azkaban/flow/FlowUtilsTest.java b/azkaban-common/src/test/java/azkaban/flow/FlowUtilsTest.java
new file mode 100644
index 0000000..54a141a
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/flow/FlowUtilsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.flow;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.project.DirectoryFlowLoader;
+import azkaban.project.Project;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class FlowUtilsTest {
+
+
+  private Project project;
+  private Props props;
+
+
+  @Before
+  public void setUp() throws Exception {
+    this.project = new Project(11, "myTestProject");
+    this.props = new Props();
+    final DirectoryFlowLoader loader = new DirectoryFlowLoader(this.props);
+    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
+    Assert.assertEquals(0, loader.getErrors().size());
+    this.project.setFlows(loader.getFlowMap());
+    this.project.setVersion(123);
+  }
+
+
+  @Test
+  public void testGetFlow() throws Exception {
+    final Flow flow = FlowUtils.getFlow(this.project, "jobe");
+    Assert.assertEquals(flow.getId(), "jobe");
+
+    assertThatThrownBy(() -> FlowUtils.getFlow(this.project, "nonexisting"))
+        .isInstanceOf(RuntimeException.class);
+  }
+
+  @Test
+  public void testCreateExecutableFlow() throws Exception {
+    final Flow flow = FlowUtils.getFlow(this.project, "jobe");
+    final ExecutableFlow exFlow = FlowUtils.createExecutableFlow(this.project, flow);
+    Assert.assertEquals(exFlow.getProjectId(), this.project.getId());
+    Assert.assertEquals(exFlow.getFlowId(), flow.getId());
+    Assert.assertEquals(exFlow.getProxyUsers(), this.project.getProxyUsers());
+  }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
index b92bfdd..6f12b99 100644
--- a/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
@@ -37,7 +37,7 @@ public final class HttpRequestUtilsTest {
 
   /* Helper method to get a test flow and add required properties */
   public static ExecutableFlow createExecutableFlow() throws IOException {
-    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    final ExecutableFlow flow = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     flow.getExecutionOptions().getFlowParameters()
         .put(ExecutionOptions.FLOW_PRIORITY, "1");
     flow.getExecutionOptions().getFlowParameters()
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index a16e5f6..35bb985 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -37,7 +37,7 @@ public class TestUtils {
   }
 
   /* Helper method to create an ExecutableFlow from serialized description */
-  public static ExecutableFlow createExecutableFlow(final String projectName,
+  public static ExecutableFlow createTestExecutableFlow(final String projectName,
       final String flowName) throws IOException {
     final File jsonFlowFile = ExecutionsTestUtil.getFlowFile(projectName, flowName + ".flow");
     final HashMap<String, Object> flowObj =
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 2c174c5..dd41056 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -924,9 +924,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
-    final ExecutableFlow exflow = new ExecutableFlow(project, flow);
+    final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);
     exflow.setSubmitUser(user.getUserId());
-    exflow.addAllProxyUsers(project.getProxyUsers());
 
     final ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);
     exflow.setExecutionOptions(options);