azkaban-aplcache

Trigger Instance and dependency instance status update processor

1/29/2018 10:24:23 PM
3.41.0

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstanceProcessor.java
new file mode 100644
index 0000000..de97228
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstanceProcessor.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class DependencyInstanceProcessor {
+
+  private static final Logger logger = LoggerFactory.getLogger(DependencyInstanceProcessor.class);
+  private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
+
+  @Inject
+  public DependencyInstanceProcessor(final FlowTriggerInstanceLoader depLoader) {
+    this.flowTriggerInstanceLoader = depLoader;
+  }
+
+  /**
+   * Process status update of dependency instance
+   */
+  public void processStatusUpdate(final DependencyInstance depInst) {
+    logger.debug("process status update for " + depInst);
+    //this is blocking call, might offload it to another thread if necessary.
+    this.flowTriggerInstanceLoader.updateDependencyExecutionStatus(depInst);
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
new file mode 100644
index 0000000..b4edfd8
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.flow.Flow;
+import azkaban.flow.FlowUtils;
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.project.Project;
+import azkaban.utils.Emailer;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class TriggerInstanceProcessor {
+
+  private static final Logger logger = LoggerFactory.getLogger(TriggerInstanceProcessor.class);
+  private static final String FAILURE_EMAIL_SUBJECT = "flow trigger for %s "
+      + "cancelled from %s";
+  private static final String FAILURE_EMAIL_BODY = "Your flow trigger cancelled [id: %s]";
+
+  private final ExecutorManager executorManager;
+  private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
+  private final Emailer emailer;
+
+  @Inject
+  public TriggerInstanceProcessor(final ExecutorManager executorManager,
+      final FlowTriggerInstanceLoader flowTriggerInstanceLoader,
+      final Emailer emailer) {
+    Preconditions.checkNotNull(executorManager);
+    Preconditions.checkNotNull(flowTriggerInstanceLoader);
+    Preconditions.checkNotNull(emailer);
+    this.emailer = emailer;
+    this.executorManager = executorManager;
+    this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
+  }
+
+  private void executeFlowAndUpdateExecID(final TriggerInstance triggerInst) {
+    try {
+      final Project project = triggerInst.getProject();
+      final Flow flow = FlowUtils.getFlow(project, triggerInst.getFlowId());
+      final ExecutableFlow executableFlow = FlowUtils.createExecutableFlow(project, flow);
+      // execute the flow with default execution option(concurrency option being "ignore
+      // currently running")
+      this.executorManager.submitExecutableFlow(executableFlow, triggerInst.getSubmitUser());
+      triggerInst.setFlowExecId(executableFlow.getExecutionId());
+      this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInst);
+    } catch (final Exception ex) {
+      logger.error("exception when executing the associate flow and updating flow exec id", ex);
+      //todo chengren311: should we swallow the exception or notify user
+    }
+  }
+
+  private String generateFailureEmailSubject(final TriggerInstance triggerInstance) {
+    final String flowFullName =
+        triggerInstance.getProjectName() + "." + triggerInstance.getFlowId();
+    return String.format(FAILURE_EMAIL_SUBJECT, flowFullName, this.emailer.getAzkabanName());
+  }
+
+  private String generateFailureEmailBody(final TriggerInstance triggerInstance) {
+    final String triggerInstFullName =
+        triggerInstance.getProjectName() + "." + triggerInstance.getFlowId();
+    return String.format(FAILURE_EMAIL_BODY, triggerInstFullName);
+  }
+
+  private void sendFailureEmailIfConfigured(final TriggerInstance triggerInstance) {
+    final List<String> failureEmails = triggerInstance.getFailureEmails();
+    if (!failureEmails.isEmpty()) {
+      this.emailer.sendEmail(failureEmails, generateFailureEmailSubject(triggerInstance),
+          generateFailureEmailBody(triggerInstance));
+    }
+  }
+
+  /**
+   * Process the case where status of trigger instance becomes success
+   */
+  public void processSucceed(final TriggerInstance triggerInst) {
+    logger.debug("process succeed for " + triggerInst);
+    //todo chengren311: publishing Trigger events to Azkaban Project Events page
+    executeFlowAndUpdateExecID(triggerInst);
+  }
+
+  /**
+   * Process the case where status of trigger instance becomes cancelled
+   */
+  public void processTermination(final TriggerInstance triggerInst) {
+    logger.debug("process termination for " + triggerInst);
+    sendFailureEmailIfConfigured(triggerInst);
+  }
+
+  /**
+   * Process the case where a new trigger instance is created
+   */
+  public void processNewInstance(final TriggerInstance triggerInst) {
+    logger.debug("process new instance for " + triggerInst);
+    this.flowTriggerInstanceLoader.uploadTriggerInstance(triggerInst);
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/DependencyInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/DependencyInstanceProcessorTest.java
new file mode 100644
index 0000000..ba67286
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/DependencyInstanceProcessorTest.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.flowtrigger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import java.util.Date;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class DependencyInstanceProcessorTest {
+
+  private static FlowTriggerInstanceLoader triggerInstLoader;
+  private static DependencyInstanceProcessor processor;
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    triggerInstLoader = mock(FlowTriggerInstanceLoader.class);
+    doNothing().when(triggerInstLoader).updateDependencyExecutionStatus(any());
+    processor = new DependencyInstanceProcessor(triggerInstLoader);
+  }
+
+
+  @Test
+  public void testStatusUpdate() {
+    final DependencyInstance depInst = new DependencyInstance("dep1", new Date(), null, null, Status
+        .RUNNING, CancellationCause.NONE);
+    processor.processStatusUpdate(depInst);
+    verify(triggerInstLoader).updateDependencyExecutionStatus(depInst);
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 71045b0..377be3f 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -19,8 +19,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import azkaban.Constants;
 import azkaban.db.DatabaseOperator;
-import azkaban.flowtrigger.db.FlowTriggerInstanceLoader;
-import azkaban.flowtrigger.db.JdbcFlowTriggerInstanceLoaderImpl;
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
 import azkaban.project.DirectoryYamlFlowLoader;
 import azkaban.project.FlowLoaderUtils;
 import azkaban.project.FlowTrigger;
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
new file mode 100644
index 0000000..2a1f288
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.flowtrigger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.project.CronSchedule;
+import azkaban.project.FlowTrigger;
+import azkaban.project.Project;
+import azkaban.utils.Emailer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import org.assertj.core.util.Lists;
+import org.assertj.core.util.Maps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TriggerInstanceProcessorTest {
+
+  private static final String EMAIL = "test@email.com";
+  private static FlowTriggerInstanceLoader triggerInstLoader;
+  private static ExecutorManager executorManager;
+  private static Emailer emailer;
+  private static TriggerInstanceProcessor processor;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    triggerInstLoader = mock(FlowTriggerInstanceLoader.class);
+    executorManager = mock(ExecutorManager.class);
+    when(executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");
+    emailer = mock(Emailer.class);
+    doNothing().when(emailer).sendEmail(any(), any(), any());
+    processor = new TriggerInstanceProcessor(executorManager, triggerInstLoader, emailer);
+  }
+
+  private static TriggerInstance createTriggerInstance() {
+    final FlowTrigger flowTrigger = new FlowTrigger(
+        new CronSchedule("* * * * ? *"),
+        new ArrayList<>(),
+        Duration.ofMinutes(1)
+    );
+    final Project proj = new Project(1, "proj");
+    final Flow flow = new Flow("flowId");
+    flow.addFailureEmails(Lists.newArrayList(EMAIL));
+    proj.setFlows(Maps.newHashMap("flowId", flow));
+    final List<DependencyInstance> depInstList = new ArrayList<>();
+    return new TriggerInstance("instanceId", flowTrigger, "flowId", 1,
+        "test", depInstList, -1, proj);
+  }
+
+  @Test
+  public void testProcessSucceed() throws ExecutorManagerException {
+    final TriggerInstance triggerInstance = createTriggerInstance();
+    processor.processSucceed(triggerInstance);
+    verify(executorManager).submitExecutableFlow(any(), anyString());
+    verify(triggerInstLoader).updateAssociatedFlowExecId(triggerInstance);
+  }
+
+  @Test
+  public void testProcessTermination() throws ExecutorManagerException {
+    final TriggerInstance triggerInstance = createTriggerInstance();
+    processor.processTermination(triggerInstance);
+    verify(emailer).sendEmail(any(), any(), any());
+  }
+
+  @Test
+  public void testNewInstance() throws ExecutorManagerException {
+    final TriggerInstance triggerInstance = createTriggerInstance();
+    processor.processNewInstance(triggerInstance);
+    verify(triggerInstLoader).uploadTriggerInstance(triggerInstance);
+  }
+}