azkaban-aplcache
Changes
azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java 4(+4 -0)
azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java 166(+119 -47)
azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java 109(+109 -0)
azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowpage.vm 73(+70 -3)
azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm 16(+8 -8)
azkaban-web-server/src/web/js/azkaban/view/exflow.js 121(+92 -29)
azkaban-web-server/src/web/js/azkaban/view/flow.js 102(+91 -11)
Details
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
index 4bb013d..cf0a72c 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
@@ -38,6 +38,30 @@ public class DirectoryFlowLoaderTest {
private Project project;
+ private static File decompressTarBZ2(InputStream is) throws IOException {
+ File outputDir = Files.createTempDir();
+
+ try (TarArchiveInputStream tais = new TarArchiveInputStream(
+ new BZip2CompressorInputStream(is))) {
+ TarArchiveEntry entry;
+ while ((entry = tais.getNextTarEntry()) != null) {
+ if (entry.isDirectory()) {
+ continue;
+ }
+
+ File outputFile = new File(outputDir, entry.getName());
+ File parent = outputFile.getParentFile();
+ if (!parent.exists()) {
+ parent.mkdirs();
+ }
+
+ IOUtils.copy(tais, new FileOutputStream(outputFile));
+ }
+
+ return outputDir;
+ }
+ }
+
@Before
public void setUp() {
this.project = new Project(11, "myTestProject");
@@ -78,30 +102,6 @@ public class DirectoryFlowLoaderTest {
Assert.assertEquals(3, loader.getErrors().size());
}
- private static File decompressTarBZ2(InputStream is) throws IOException {
- File outputDir = Files.createTempDir();
-
- try (TarArchiveInputStream tais = new TarArchiveInputStream(
- new BZip2CompressorInputStream(is))) {
- TarArchiveEntry entry;
- while ((entry = tais.getNextTarEntry()) != null) {
- if (entry.isDirectory()) {
- continue;
- }
-
- File outputFile = new File(outputDir, entry.getName());
- File parent = outputFile.getParentFile();
- if (!parent.exists()) {
- parent.mkdirs();
- }
-
- IOUtils.copy(tais, new FileOutputStream(outputFile));
- }
-
- return outputDir;
- }
- }
-
@Test
public void testMassiveFlow() throws Exception {
final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
index 5f7939e..81294d4 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/FlowTriggerInstanceLoader.java
@@ -59,4 +59,8 @@ public interface FlowTriggerInstanceLoader {
TriggerInstance getTriggerInstanceById(String triggerInstanceId);
+ TriggerInstance getTriggerInstanceByFlowExecId(int execId);
+
+ Collection<TriggerInstance> getTriggerInstances(int projectId, String flowId, int from, int
+ length);
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 6daca2c..7a3d1cc 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -36,6 +36,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -56,7 +57,6 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
private static final Logger logger = LoggerFactory
.getLogger(JdbcFlowTriggerInstanceLoaderImpl.class);
-
private static final String[] DEPENDENCY_EXECUTIONS_COLUMNS = {"trigger_instance_id", "dep_name",
"starttime", "endtime", "dep_status", "cancelleation_cause", "project_id", "project_version",
"flow_id", "flow_version", "flow_exec_id"};
@@ -73,25 +73,28 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
("UPDATE %s SET dep_status = ?, endtime = ?, cancelleation_cause = ? WHERE trigger_instance_id = "
+ "? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);
-
private static final String SELECT_EXECUTIONS_BY_INSTANCE_ID =
String.format("SELECT %s FROM %s WHERE trigger_instance_id = ?",
StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
DEPENDENCY_EXECUTION_TABLE);
+ private static final String SELECT_EXECUTIONS_BY_EXEC_ID =
+ String.format("SELECT %s FROM %s WHERE flow_exec_id = ?",
+ StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
+ DEPENDENCY_EXECUTION_TABLE);
+
private static final String SELECT_ALL_PENDING_EXECUTIONS =
- String
- .format(
- "SELECT %s FROM %s WHERE trigger_instance_id in (SELECT trigger_instance_id FROM %s "
- + "WHERE "
- + "dep_status = %s or dep_status = %s or (dep_status = %s and "
- + "flow_exec_id = %s))",
- StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
- DEPENDENCY_EXECUTION_TABLE,
- DEPENDENCY_EXECUTION_TABLE,
- Status.RUNNING.ordinal(), Status.CANCELLING.ordinal(),
- Status.SUCCEEDED.ordinal(),
- Constants.UNASSIGNED_EXEC_ID);
+ String.format(
+ "SELECT %s FROM %s WHERE trigger_instance_id in (SELECT trigger_instance_id FROM %s "
+ + "WHERE "
+ + "dep_status = %s or dep_status = %s or (dep_status = %s and "
+ + "flow_exec_id = %s))",
+ StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
+ DEPENDENCY_EXECUTION_TABLE,
+ DEPENDENCY_EXECUTION_TABLE,
+ Status.RUNNING.ordinal(), Status.CANCELLING.ordinal(),
+ Status.SUCCEEDED.ordinal(),
+ Constants.UNASSIGNED_EXEC_ID);
private static final String SELECT_ALL_RUNNING_EXECUTIONS =
String.format(
@@ -116,15 +119,24 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
Status.RUNNING.ordinal(),
Status.CANCELLING.ordinal());
+ private static final String SELECT_RECENT_WITH_START_AND_LENGTH = String.format("SELECT %s FROM"
+ + " %s WHERE trigger_instance_id IN (\n"
+ + "SELECT trigger_instance_id FROM (\n"
+ + "SELECT trigger_instance_id, min(starttime) AS trigger_start_time FROM %s"
+ + " WHERE project_id = ? AND flow_id = ? GROUP BY "
+ + "trigger_instance_id ORDER BY trigger_start_time DESC\n"
+ + "LIMIT ? OFFSET ?) AS tmp);", StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
+ DEPENDENCY_EXECUTION_TABLE, DEPENDENCY_EXECUTION_TABLE);
private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
+ "flow_exec_id "
+ "= ? WHERE trigger_instance_id = ? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);
- private final DatabaseOperator dbOperator;
private final ProjectLoader projectLoader;
+ private final DatabaseOperator dbOperator;
private final ProjectManager projectManager;
+
@Inject
public JdbcFlowTriggerInstanceLoaderImpl(final DatabaseOperator databaseOperator,
final ProjectLoader projectLoader, final ProjectManager projectManager) {
@@ -137,11 +149,12 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
public Collection<TriggerInstance> getIncompleteTriggerInstances() {
final Collection<TriggerInstance> unfinished = new ArrayList<>();
try {
- final Collection<TriggerInstance> triggerInstances = this.dbOperator
- .query(SELECT_ALL_PENDING_EXECUTIONS, new TriggerInstanceHandler());
+ final Collection<TriggerInstance> triggerInsts = this.dbOperator
+ .query(SELECT_ALL_PENDING_EXECUTIONS,
+ new TriggerInstanceHandler(SORT_MODE.SORT_ON_START_TIME_ASC));
// select incomplete trigger instances
- for (final TriggerInstance triggerInst : triggerInstances) {
+ for (final TriggerInstance triggerInst : triggerInsts) {
if (!Status.isDone(triggerInst.getStatus()) || (triggerInst.getStatus() == Status.SUCCEEDED
&& triggerInst.getFlowExecId() == Constants.UNASSIGNED_EXEC_ID)) {
unfinished.add(triggerInst);
@@ -268,7 +281,8 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
public Collection<TriggerInstance> getRecentlyFinished(final int limit) {
final String query = String.format(SELECT_RECENTLY_FINISHED, limit);
try {
- return this.dbOperator.query(query, new TriggerInstanceHandler());
+ return this.dbOperator
+ .query(query, new TriggerInstanceHandler(SORT_MODE.SORT_ON_START_TIME_ASC));
} catch (final SQLException ex) {
handleSQLException(ex);
}
@@ -281,27 +295,15 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
//todo chengren311:
// 1. add index for the execution_dependencies table to accelerate selection.
// 2. implement purging mechanism to keep reasonable amount of historical executions in db.
- return this.dbOperator.query(SELECT_ALL_RUNNING_EXECUTIONS, new TriggerInstanceHandler());
+ return this.dbOperator.query(SELECT_ALL_RUNNING_EXECUTIONS, new TriggerInstanceHandler
+ (SORT_MODE.SORT_ON_START_TIME_ASC));
} catch (final SQLException ex) {
handleSQLException(ex);
}
return Collections.emptyList();
}
- /**
- * Retrieve a trigger instance given an instance id. Flow trigger properties will also be
- * populated into the returned trigger instance.
- */
- @Override
- public TriggerInstance getTriggerInstanceById(final String triggerInstanceId) {
- TriggerInstance triggerInstance = null;
- try {
- final Collection<TriggerInstance> res = this.dbOperator
- .query(SELECT_EXECUTIONS_BY_INSTANCE_ID, new TriggerInstanceHandler(), triggerInstanceId);
- triggerInstance = !res.isEmpty() ? res.iterator().next() : null;
- } catch (final SQLException ex) {
- handleSQLException(ex);
- }
+ private void populateFlowTriggerProperties(final TriggerInstance triggerInstance) {
if (triggerInstance != null) {
final int projectId = triggerInstance.getProject().getId();
final int projectVersion = triggerInstance.getProject().getVersion();
@@ -318,7 +320,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
triggerInstance.setFlowTrigger(flowTrigger);
}
} else {
- logger.error("Unable to find flow file for " + triggerInstanceId);
+ logger.error("Unable to find flow file for " + triggerInstance);
}
} catch (final Exception ex) {
logger.error("error in getting flow file", ex);
@@ -326,9 +328,81 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
FlowLoaderUtils.cleanUpDir(tempDir);
}
}
+ }
+
+ /**
+ * Retrieve a trigger instance given a flow execution id. Flow trigger properties will
+ * also be populated into the returned trigger instance. If flow exec id is -1 or -2, then
+ * null will be returned.
+ */
+ @Override
+ public TriggerInstance getTriggerInstanceByFlowExecId(final int flowExecId) {
+ if (flowExecId == Constants.FAILED_EXEC_ID || flowExecId == Constants.UNASSIGNED_EXEC_ID) {
+ return null;
+ }
+ TriggerInstance triggerInstance = null;
+ try {
+ final Collection<TriggerInstance> res = this.dbOperator
+ .query(SELECT_EXECUTIONS_BY_EXEC_ID,
+ new TriggerInstanceHandler(SORT_MODE.SORT_ON_START_TIME_ASC), flowExecId);
+ triggerInstance = !res.isEmpty() ? res.iterator().next() : null;
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ }
+ populateFlowTriggerProperties(triggerInstance);
+ return triggerInstance;
+ }
+
+ @Override
+ /**
+ * Retrieve sorted trigger instances on start time in descending order
+ * given projectId, flowId, start position and length.
+ * @param projectId
+ * @param flowId
+ * @param from starting position of the range of trigger instance to retrieve
+ * @param length number of consecutive trigger instances to retrieve
+ */
+ public Collection<TriggerInstance> getTriggerInstances(
+ final int projectId, final String flowId, final int from,
+ final int length) {
+
+ try {
+ final Collection<TriggerInstance> res = this.dbOperator
+ .query(SELECT_RECENT_WITH_START_AND_LENGTH, new TriggerInstanceHandler(SORT_MODE
+ .SORT_ON_START_TIME_DESC), projectId,
+ flowId, length, from);
+ return res;
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Retrieve a trigger instance given an instance id. Flow trigger properties will also be
+ * populated into the returned trigger instance.
+ */
+ @Override
+ public TriggerInstance getTriggerInstanceById(final String triggerInstanceId) {
+ TriggerInstance triggerInstance = null;
+ try {
+ final Collection<TriggerInstance> res = this.dbOperator
+ .query(SELECT_EXECUTIONS_BY_INSTANCE_ID,
+ new TriggerInstanceHandler(SORT_MODE.SORT_ON_START_TIME_ASC),
+ triggerInstanceId);
+ triggerInstance = !res.isEmpty() ? res.iterator().next() : null;
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ }
+ populateFlowTriggerProperties(triggerInstance);
return triggerInstance;
}
+ private enum SORT_MODE {
+ SORT_ON_START_TIME_DESC,
+ SORT_ON_START_TIME_ASC
+ }
+
public static class FlowConfigID {
private final int projectId;
@@ -394,7 +468,10 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
private class TriggerInstanceHandler implements
ResultSetHandler<Collection<TriggerInstance>> {
- public TriggerInstanceHandler() {
+ private final SORT_MODE mode;
+
+ public TriggerInstanceHandler(final SORT_MODE mode) {
+ this.mode = mode;
}
@Override
@@ -438,20 +515,15 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
.submitUser, entry.getValue(), entry.getKey().flowExecId, entry.getKey().project));
}
- //sort on start time in ascending order
- Collections.sort(res, (o1, o2) -> {
- if (o1.getStartTime() < o2.getStartTime()) {
- return -1;
- } else if (o1.getStartTime() > o2.getStartTime()) {
- return 1;
- } else {
- return 0;
- }
- });
-
+ if (this.mode == SORT_MODE.SORT_ON_START_TIME_ASC) {
+ Collections.sort(res, Comparator.comparing(TriggerInstance::getStartTime));
+ } else if (this.mode == SORT_MODE.SORT_ON_START_TIME_DESC) {
+ Collections.sort(res, Comparator.comparing(TriggerInstance::getStartTime).reversed());
+ }
return res;
}
+
private class TriggerInstKey {
String triggerInstId;
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index cb6dd5b..3ff5f3f 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -72,7 +72,7 @@ import org.slf4j.LoggerFactory;
public class FlowTriggerService {
private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
- private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
+ private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 50;
private static final int CANCEL_EXECUTOR_POOL_SIZE = 32;
private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
private final ExecutorService singleThreadExecutorService;
@@ -189,6 +189,10 @@ public class FlowTriggerService {
return this.flowTriggerInstanceLoader.getTriggerInstanceById(triggerInstanceId);
}
+ public TriggerInstance findTriggerInstanceByExecId(final int flowExecId) {
+ return this.flowTriggerInstanceLoader.getTriggerInstanceByFlowExecId(flowExecId);
+ }
+
private boolean isDoneButFlowNotExecuted(final TriggerInstance triggerInstance) {
return triggerInstance.getStatus() == Status.SUCCEEDED && triggerInstance.getFlowExecId() ==
Constants.UNASSIGNED_EXEC_ID;
@@ -540,4 +544,9 @@ public class FlowTriggerService {
this.triggerProcessor.shutdown();
this.triggerPluginManager.shutdown();
}
+
+ public Collection<TriggerInstance> getTriggerInstances(final int projectId, final String flowId,
+ final int from, final int length) {
+ return this.flowTriggerInstanceLoader.getTriggerInstances(projectId, flowId, from, length);
+ }
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
index 09ea6b8..afbfd99 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -113,8 +113,8 @@ public class FlowTriggerScheduler {
* Retrieve the list of scheduled flow triggers from quartz database
*/
public List<ScheduledFlowTrigger> getScheduledFlowTriggerJobs() {
- final Scheduler quartzScheduler = this.scheduler.getScheduler();
try {
+ final Scheduler quartzScheduler = this.scheduler.getScheduler();
final List<String> groupNames = quartzScheduler.getJobGroupNames();
final List<ScheduledFlowTrigger> flowTriggerJobDetails = new ArrayList<>();
@@ -131,7 +131,7 @@ public class FlowTriggerScheduler {
.get(FlowTriggerQuartzJob.FLOW_TRIGGER);
final String submitUser = jobDataMap.getString(FlowTriggerQuartzJob.SUBMIT_USER);
final List<? extends Trigger> quartzTriggers = quartzScheduler.getTriggersOfJob(jobKey);
- scheduledFlowTrigger = new ScheduledFlowTrigger(
+ scheduledFlowTrigger = new ScheduledFlowTrigger(projectId,
this.projectManager.getProject(projectId).getName(),
flowId, flowTrigger, submitUser, quartzTriggers.isEmpty() ? null
: quartzTriggers.get(0));
@@ -143,7 +143,7 @@ public class FlowTriggerScheduler {
flowTriggerJobDetails.add(scheduledFlowTrigger);
}
return flowTriggerJobDetails;
- } catch (final SchedulerException ex) {
+ } catch (final Exception ex) {
logger.error("unable to get scheduled flow triggers", ex);
return new ArrayList<>();
}
@@ -176,15 +176,17 @@ public class FlowTriggerScheduler {
public static class ScheduledFlowTrigger {
+ private final int projectId;
private final String projectName;
private final String flowId;
private final FlowTrigger flowTrigger;
private final Trigger quartzTrigger;
private final String submitUser;
- public ScheduledFlowTrigger(final String projectName, final String flowId,
+ public ScheduledFlowTrigger(final int projectId, final String projectName, final String flowId,
final FlowTrigger flowTrigger, final String submitUser,
final Trigger quartzTrigger) {
+ this.projectId = projectId;
this.projectName = projectName;
this.flowId = flowId;
this.flowTrigger = flowTrigger;
@@ -192,6 +194,10 @@ public class FlowTriggerScheduler {
this.quartzTrigger = quartzTrigger;
}
+ public int getProjectId() {
+ return this.projectId;
+ }
+
public String getProjectName() {
return this.projectName;
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 7f2bb6b..6551df9 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -31,6 +31,8 @@ import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.flow.FlowUtils;
+import azkaban.flowtrigger.FlowTriggerService;
+import azkaban.flowtrigger.TriggerInstance;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.scheduler.Schedule;
@@ -70,6 +72,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
private WebMetrics webMetrics;
private ProjectManager projectManager;
+ private FlowTriggerService flowTriggerService;
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
private UserManager userManager;
@@ -82,6 +85,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
this.projectManager = server.getProjectManager();
this.executorManager = server.getExecutorManager();
this.scheduleManager = server.getScheduleManager();
+ this.flowTriggerService = server.getFlowTriggerService();
// TODO: reallocf fully guicify
this.webMetrics = SERVICE_PROVIDER.getInstance(WebMetrics.class);
}
@@ -95,8 +99,10 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (hasParam(req, "job")) {
handleExecutionJobDetailsPage(req, resp, session);
} else {
- handleExecutionFlowPage(req, resp, session);
+ handleExecutionFlowPageByExecId(req, resp, session);
}
+ } else if (hasParam(req, "triggerinstanceid")) {
+ handleExecutionFlowPageByTriggerInstanceId(req, resp, session);
} else {
handleExecutionsPage(req, resp, session);
}
@@ -362,39 +368,47 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
page.render();
}
- private void handleExecutionFlowPage(final HttpServletRequest req,
+ private void handleExecutionFlowPageByTriggerInstanceId(final HttpServletRequest req,
final HttpServletResponse resp, final Session session) throws ServletException,
IOException {
final Page page =
newPage(req, resp, session,
"azkaban/webapp/servlet/velocity/executingflowpage.vm");
final User user = session.getUser();
- final int execId = getIntParam(req, "execid");
- page.add("execid", execId);
+ final String triggerInstanceId = getParam(req, "triggerinstanceid");
- ExecutableFlow flow = null;
- try {
- flow = this.executorManager.getExecutableFlow(execId);
- if (flow == null) {
- page.add("errorMsg", "Error loading executing flow " + execId
- + " not found.");
- page.render();
- return;
- }
- } catch (final ExecutorManagerException e) {
- page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
+ final TriggerInstance triggerInst = this.flowTriggerService
+ .findTriggerInstanceById(triggerInstanceId);
+
+ if (triggerInst == null) {
+ page.add("errorMsg", "Error loading trigger instance " + triggerInstanceId
+ + " not found.");
page.render();
return;
}
- final int projectId = flow.getProjectId();
+ page.add("triggerInstanceId", triggerInstanceId);
+ page.add("execid", triggerInst.getFlowExecId());
+
+ final int projectId = triggerInst.getProject().getId();
final Project project =
getProjectPageByPermission(page, projectId, user, Type.READ);
+
if (project == null) {
page.render();
return;
}
+ addExternalLinkLabel(req, page);
+
+ page.add("projectId", project.getId());
+ page.add("projectName", project.getName());
+ page.add("flowid", triggerInst.getFlowId());
+
+ page.render();
+ }
+
+ private void addExternalLinkLabel(final HttpServletRequest req, final Page page) {
final Props props = getApplication().getServerProps();
final String execExternalLinkURL = ExternalLinkUtils.getExternalAnalyzerOnReq(props, req);
@@ -409,6 +423,43 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
page.add("executionExternalLinkLabel", execExternalLinkLabel);
logger.debug("External analyzer label set to : " + execExternalLinkLabel);
}
+ }
+
+ private void handleExecutionFlowPageByExecId(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ IOException {
+ final Page page =
+ newPage(req, resp, session,
+ "azkaban/webapp/servlet/velocity/executingflowpage.vm");
+ final User user = session.getUser();
+ final int execId = getIntParam(req, "execid");
+ page.add("execid", execId);
+ page.add("triggerInstanceId", "-1");
+
+ ExecutableFlow flow = null;
+ try {
+ flow = this.executorManager.getExecutableFlow(execId);
+ if (flow == null) {
+ page.add("errorMsg", "Error loading executing flow " + execId
+ + " not found.");
+ page.render();
+ return;
+ }
+ } catch (final ExecutorManagerException e) {
+ page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
+ page.render();
+ return;
+ }
+
+ final int projectId = flow.getProjectId();
+ final Project project =
+ getProjectPageByPermission(page, projectId, user, Type.READ);
+ if (project == null) {
+ page.render();
+ return;
+ }
+
+ addExternalLinkLabel(req, page);
page.add("projectId", project.getId());
page.add("projectName", project.getName());
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
index aa6caf2..ceaacf1 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
@@ -25,6 +25,7 @@ import azkaban.project.ProjectManager;
import azkaban.server.session.Session;
import azkaban.user.Permission.Type;
import azkaban.webapp.AzkabanWebServer;
+import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -83,6 +84,7 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
final HashMap<String, Object> ret = new HashMap<>();
final String ajaxName = getParam(req, "ajax");
+ //todo chengren311: add permission control
if (ajaxName.equals("fetchRunningTriggers")) {
ajaxFetchRunningTriggerInstances(ret);
} else if (ajaxName.equals("killRunningTrigger")) {
@@ -99,6 +101,33 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
} else {
ret.put("error", "please specify a valid running trigger instance id");
}
+ } else if (ajaxName.equals("fetchTriggerStatus")) {
+ if (hasParam(req, "triggerinstid")) {
+ final String triggerInstanceId = getParam(req, "triggerinstid");
+ ajaxFetchTriggerInstanceByTriggerInstId(triggerInstanceId, session, ret);
+ } else if (hasParam(req, "execid")) {
+ final int execId = getIntParam(req, "execid");
+ ajaxFetchTriggerInstanceByExecId(execId, session, ret);
+ } else {
+ ret.put("error", "please specify a valid trigger instance id or flow execution id");
+ }
+ } else if (ajaxName.equals("fetchTriggerInstances")) {
+ if (hasParam(req, "project") && hasParam(req, "flow")) {
+ final String projectName = getParam(req, "project");
+ final String flowId = getParam(req, "flow");
+ final Project project = this.projectManager.getProject(projectName);
+ if (project == null) {
+ ret.put("error", "please specify a valid project name");
+ return;
+ }
+ if (!hasPermission(project, session.getUser(), Type.READ)) {
+ ret.put("error", "Permission denied. Need READ access.");
+ return;
+ }
+ ajaxFetchTriggerInstances(project.getId(), flowId, ret, req);
+ } else {
+ ret.put("error", "please specify project id and flow id");
+ }
}
if (ret != null) {
@@ -106,6 +135,38 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
}
}
+ private void ajaxFetchTriggerInstances(
+ final int projectId,
+ final String flowId,
+ final HashMap<String, Object> ret,
+ final HttpServletRequest req)
+ throws ServletException {
+
+ final int from = Integer.valueOf(getParam(req, "start"));
+ final int length = Integer.valueOf(getParam(req, "length"));
+
+ final Collection<TriggerInstance> triggerInstances = this.triggerService
+ .getTriggerInstances(projectId, flowId, from, length);
+
+ ret.put("flow", flowId);
+ ret.put("total", triggerInstances.size());
+ ret.put("from", from);
+ ret.put("length", length);
+
+ final List<Object> history = new ArrayList<>();
+ for (final TriggerInstance instance : triggerInstances) {
+ final HashMap<String, Object> triggerInfo = new HashMap<>();
+ triggerInfo.put("instanceId", instance.getId());
+ triggerInfo.put("submitUser", instance.getSubmitUser());
+ triggerInfo.put("startTime", instance.getStartTime());
+ triggerInfo.put("endTime", instance.getEndTime());
+ triggerInfo.put("status", instance.getStatus().toString());
+ history.add(triggerInfo);
+ }
+
+ ret.put("executions", history);
+ }
+
private void loadTriggerProperties(final String triggerInstanceId,
final HashMap<String, Object> ret) {
final TriggerInstance triggerInstance = this.triggerService
@@ -117,6 +178,54 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
}
}
+
+ private void wrapTriggerInst(final TriggerInstance triggerInst,
+ final HashMap<String, Object> ret) {
+ final List<Map<String, Object>> dependencyOutput = new ArrayList<>();
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ final Map<String, Object> depMap = new HashMap<>();
+ depMap.put("triggerInstanceId", depInst.getTriggerInstance().getId());
+ depMap.put("dependencyName", depInst.getDepName());
+ depMap.put("dependencyType", depInst.getTriggerInstance().getFlowTrigger()
+ .getDependencyByName(depInst.getDepName()).getType());
+ depMap.put("dependencyStartTime", depInst.getStartTime());
+ depMap.put("dependencyEndTime", depInst.getEndTime());
+ depMap.put("dependencyStatus", depInst.getStatus());
+ depMap.put("dependencyCancelCause", depInst.getCancellationCause());
+ depMap.put("dependencyConfig", depInst.getTriggerInstance().getFlowTrigger()
+ .getDependencyByName(depInst.getDepName()));
+ dependencyOutput.add(depMap);
+ }
+ ret.put("items", dependencyOutput);
+
+ ret.put("triggerId", triggerInst.getId());
+ ret.put("triggerSubmitter", triggerInst.getSubmitUser());
+ ret.put("triggerStartTime", triggerInst.getStartTime());
+ ret.put("triggerEndTime", triggerInst.getEndTime());
+ ret.put("triggerStatus", triggerInst.getStatus());
+ final String flowTriggerJson = new GsonBuilder().setPrettyPrinting().create()
+ .toJson(triggerInst.getFlowTrigger());
+ ret.put("triggerProps", flowTriggerJson);
+ }
+
+ private void ajaxFetchTriggerInstanceByExecId(final int execId, final Session session,
+ final HashMap<String, Object> ret) {
+ final TriggerInstance triggerInst = this.triggerService
+ .findTriggerInstanceByExecId(execId);
+ if (triggerInst != null) {
+ wrapTriggerInst(triggerInst, ret);
+ }
+ }
+
+ private void ajaxFetchTriggerInstanceByTriggerInstId(final String triggerInstanceId,
+ final Session session, final HashMap<String, Object> ret) {
+ final TriggerInstance triggerInst = this.triggerService
+ .findTriggerInstanceById(triggerInstanceId);
+ if (triggerInst != null) {
+ wrapTriggerInst(triggerInst, ret);
+ }
+ }
+
private void ajaxKillTriggerInstance(final String triggerInstanceId, final Session session,
final HashMap<String, Object> ret) {
final TriggerInstance triggerInst = this.triggerService
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
index 4aef3bc..7ea7bc6 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
@@ -17,9 +17,12 @@
package azkaban.webapp.servlet;
import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
+import azkaban.flowtrigger.quartz.FlowTriggerScheduler.ScheduledFlowTrigger;
import azkaban.server.session.Session;
import azkaban.webapp.AzkabanWebServer;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -40,7 +43,53 @@ public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
@Override
protected void handleGet(final HttpServletRequest req, final HttpServletResponse resp,
final Session session) throws ServletException, IOException {
- handlePage(req, resp, session);
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ } else {
+ handlePage(req, resp, session);
+ }
+ }
+
+ private void ajaxFetchTrigger(final int projectId, final String flowId, final Session session,
+ final HashMap<String,
+ Object> ret) {
+ final ScheduledFlowTrigger res = this.scheduler
+ .getScheduledFlowTriggerJobs().stream().filter(
+ scheduledFlowTrigger -> scheduledFlowTrigger.getFlowId().equals(flowId)
+ && scheduledFlowTrigger.getProjectId
+ () == projectId).findFirst().orElse(null);
+
+ if (res != null) {
+ final Map<String, Object> jsonObj = new HashMap<>();
+ jsonObj.put("cronExpression", res.getFlowTrigger().getSchedule().getCronExpression());
+ jsonObj.put("submitUser", res.getSubmitUser());
+ jsonObj.put("firstSchedTime",
+ utils.formatDateTime(res.getQuartzTrigger().getStartTime().getTime()));
+ jsonObj.put("nextExecTime",
+ utils.formatDateTime(res.getQuartzTrigger().getNextFireTime().getTime()));
+ jsonObj.put("maxWaitMin", res.getFlowTrigger().getMaxWaitDuration().toMinutes());
+ if (!res.getFlowTrigger().getDependencies().isEmpty()) {
+ jsonObj.put("dependencies", res.getDependencyListJson());
+ }
+ ret.put("flowTrigger", jsonObj);
+ }
+ }
+
+ private void handleAJAXAction(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
+ IOException {
+ final HashMap<String, Object> ret = new HashMap<>();
+ final String ajaxName = getParam(req, "ajax");
+ if (ajaxName.equals("fetchTrigger")) {
+ if (hasParam(req, "projectId") && hasParam(req, "flowId")) {
+ final int projectId = getIntParam(req, "projectId");
+ final String flowId = getParam(req, "flowId");
+ ajaxFetchTrigger(projectId, flowId, session, ret);
+ if (ret != null) {
+ this.writeJSON(resp, ret);
+ }
+ }
+ }
}
private void handlePage(final HttpServletRequest req,
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowpage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowpage.vm
index c30daa1..ffca2ac 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -31,6 +31,7 @@
<script type="text/javascript" src="${context}/js/flowstats-no-data.js"></script>
<script type="text/javascript" src="${context}/js/azkaban/view/flow-execution-list.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban/view/flow-trigger-list.js"></script>
<script type="text/javascript" src="${context}/js/azkaban/view/flow-execute-dialog.js"></script>
<script type="text/javascript" src="${context}/js/azkaban/view/flow-stats.js"></script>
<script type="text/javascript" src="${context}/js/azkaban/view/exflow.js"></script>
@@ -40,11 +41,11 @@
var timezone = "${timezone}";
var errorMessage = null;
var successMessage = null;
- ;
var projectId = "${projectId}";
var projectName = "${projectName}";
var flowId = "${flowid}";
var execId = "${execid}";
+ var triggerInstanceId = "${triggerInstanceId}";
</script>
<link rel="stylesheet" type="text/css" href="${context}/css/morris.css"/>
<link rel="stylesheet" type="text/css" href="${context}/css/jquery-ui-1.10.1.custom.css"/>
@@ -68,7 +69,11 @@
<h1>
<a href="${context}/executor?execid=${execid}">
Flow Execution
- <small>$execid <span id="flowStatus">-</span></small>
+ #if (${execid} == "-1" || ${execid} == "-2")
+ <small>not started <span id="flowStatus">-</span></small>
+ #else
+ <small>$execid <span id="flowStatus">-</span></small>
+ #end
</a>
</h1>
</div>
@@ -95,7 +100,8 @@
href="${context}/manager?project=${projectName}"><strong>Project</strong> $projectName
</a></li>
<li><a
- href="${context}/manager?project=${projectName}&flow=${flowid}"><strong>Flow</strong> $flowid
+ href="${context}/manager?project=${projectName}&flow=${flowid}"><strong>Flow</strong>
+ $flowid
</a></li>
<li class="active"><strong>Execution</strong> $execid</li>
</ol>
@@ -110,6 +116,7 @@
<ul class="nav nav-tabs nav-sm" id="headertabs">
<li id="graphViewLink"><a href="#graph">Graph</a></li>
+ <li id="flowTriggerlistViewLink"><a href="#triggerslist">Flow Trigger List</a></li>
<li id="jobslistViewLink"><a href="#jobslist">Job List</a></li>
<li id="flowLogViewLink"><a href="#log">Flow Log</a></li>
<li id="statsViewLink"><a href="#stats">Stats</a></li>
@@ -142,6 +149,66 @@
#parse ("azkaban/webapp/servlet/velocity/flowgraphview.vm")
+ ## Flow Trigger List View
+
+ <div class="container-full" id="flowTriggerListView">
+ <div class="row">
+ <div class="col-xs-12">
+ <table class="table table-bordered table-condensed table-hover executions-table">
+ <thead>
+ <tr>
+ <th>Trigger Instance Id</th>
+ <th>Submitted by</th>
+ <th class="date">Start Time</th>
+ <th class="date">End Time</th>
+ <th class="elapse">Elapsed</th>
+ <th class="status">Status</th>
+ <th class="props">Trigger Properties</th>
+ <th class="action">Action</th>
+ </tr>
+
+ <div class="modal fade" id="dependencyList" tabindex="-1"
+ role="dialog"
+ aria-labelledby="dependencyLabel">
+ <div class="modal-dialog" role="document">
+ <div class="modal-content">
+ <div class="modal-header">
+ <button type="button" class="close" data-dismiss="modal" aria-label="Close">
+ <span aria-hidden="true">×</span></button>
+ <h4 class="modal-title" id="dependencyLabel">Dependencies</h4>
+ </div>
+ <div id="triggerProp" class="modal-body">
+ </div>
+ <div class="modal-footer">
+ <button type="button" class="btn btn-default" data-dismiss="modal">Close
+ </button>
+ </div>
+ </div>
+ </div>
+ </div>
+
+ </thead>
+ <tbody id="triggerBody">
+ </tbody>
+ <thead>
+ <tr>
+ <th>Trigger Instance Id</th>
+ <th>Dependency Name</th>
+ <th class="triggertype">Dependency Type</th>
+ <th class="date">Start Time</th>
+ <th class="date">End Time</th>
+ <th class="elapse">Elapsed</th>
+ <th class="status">Status</th>
+ <th class="cause">Cancellation Cause</th>
+ </tr>
+ </thead>
+ <tbody id="triggerExecutableBody">
+ </tbody>
+ </table>
+ </div><!-- /.col-xs-12 -->
+ </div><!-- /.row -->
+ </div><!-- /.container-full -->
+
## Job List View
<div class="container-full" id="jobListView">
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm
index c7edd51..66916c8 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm
@@ -21,7 +21,6 @@
#parse("azkaban/webapp/servlet/velocity/style.vm")
#parse("azkaban/webapp/servlet/velocity/javascript.vm")
<script type="text/javascript" src="${context}/js/azkaban/util/ajax.js"></script>
- #
<script type="text/javascript" src="${context}/js/azkaban/view/executions.js"></script>
<script type="text/javascript" src="${context}/js/jquery/jquery.tablesorter.js"></script>
<script type="text/javascript">
@@ -58,8 +57,7 @@
showDialog("Error", data.error);
}
else {
- showDialog("Killed", "Trigger has been killed.");
-
+ showDialog("Killed", "Trigger " + id + " has been killed.");
}
};
ajaxCall(requestURL, requestData, successHandler);
@@ -85,14 +83,14 @@
</head>
<body>
- #set ($current_page="executing")
+ #set ($current_page="flowtriggerinstance")
#parse ("azkaban/webapp/servlet/velocity/nav.vm")
## Page header.
<div class="az-page-header">
<div class="container-full">
- <h1><a href="${context}/executor">Executing Triggers</a></h1>
+ <h1><a href="${context}/flowtriggerinstance">Executing Triggers</a></h1>
</div>
</div>
@@ -109,7 +107,7 @@
<div class="row" id="currently-running-view">
<div class="col-xs-12">
- <table id="executingJobs"
+ <table id="executingTriggers"
class="table table-striped table-bordered table-hover table-condensed executions-table">
<thead>
<tr>
@@ -137,7 +135,8 @@
</td>
<td>
#if (${trigger.getId()})
- ${trigger.getId()}
+ <a href="${context}/executor?triggerinstanceid=${trigger.getId()}" }>
+ ${trigger.getId()} </a>
#else
-
#end
@@ -285,7 +284,8 @@
</td>
<td>
#if (${trigger.getId()})
- ${trigger.getId()}
+ <a href="${context}/executor?triggerinstanceid=${trigger.getId()}" }>
+ ${trigger.getId()} </a>
#else
-
#end
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowpage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowpage.vm
index 37445e3..7c0e4b7 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -101,6 +101,7 @@
<ul class="nav nav-tabs nav-sm" id="headertabs">
<li id="graphViewLink"><a href="#graph">Graph</a></li>
<li id="executionsViewLink"><a href="#executions">Executions</a></li>
+ <li id="flowtriggersViewLink"><a href="#flowtriggers">Flow Triggers</a></li>
<li id="summaryViewLink"><a href="#summary">Summary</a></li>
</ul>
</div>
@@ -146,6 +147,40 @@
</div>
</div><!-- /.container-fill -->
+ ## Flow trigger view.
+ <div class="container-full" id="flowtriggerView">
+ <div class="row">
+ <div class="col-xs-12">
+
+ <table class="table table-striped table-bordered table-condensed table-hover"
+ id="execTable">
+ <thead>
+ <tr>
+ <th>Flow Trigger Instance Id</th>
+ <th>Submitting user</th>
+ <th class="date">Start Time</th>
+ <th class="date">End Time</th>
+ <th class="elapse">Elapsed</th>
+ <th class="status">Status</th>
+ <th class="action">Action</th>
+ </tr>
+ </thead>
+ <tbody id="triggerTableBody">
+ </tbody>
+ </table>
+ <ul id="pageSelection" class="pagination">
+ <li id="previous" class="first"><a><span class="arrow">←</span>Previous</a></li>
+ <li id="page1"><a href="#page1">1</a></li>
+ <li id="page2"><a href="#page2">2</a></li>
+ <li id="page3"><a href="#page3">3</a></li>
+ <li id="page4"><a href="#page4">4</a></li>
+ <li id="page5"><a href="#page5">5</a></li>
+ <li id="next"><a>Next<span class="arrow">→</span></a></li>
+ </ul>
+ </div>
+ </div>
+ </div><!-- /.container-fill -->
+
## Summary view.
<div class="container-full" id="summaryView">
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
index 383d17e..ec8c033 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
@@ -21,8 +21,6 @@
#parse("azkaban/webapp/servlet/velocity/style.vm")
#parse("azkaban/webapp/servlet/velocity/javascript.vm")
<script type="text/javascript" src="${context}/js/azkaban/util/ajax.js"></script>
- #
- <script type="text/javascript" src="${context}/js/azkaban/view/executions.js"></script>
<script type="text/javascript" src="${context}/js/jquery/jquery.tablesorter.js"></script>
<script type="text/javascript">
var contextURL = "${context}";
@@ -39,14 +37,14 @@
</head>
<body>
- #set ($current_page="executing")
+ #set ($current_page="flowtrigger")
#parse ("azkaban/webapp/servlet/velocity/nav.vm")
## Page header.
<div class="az-page-header">
<div class="container-full">
- <h1><a href="${context}/executor">Scheduled Flow Triggers</a></h1>
+ <h1><a href="${context}/flowtrigger">Scheduled Flow Triggers</a></h1>
</div>
</div>
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/nav.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/nav.vm
index a7609a0..dad4d44 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/nav.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/nav.vm
@@ -105,6 +105,11 @@
<li#if($current_page == 'history') class="active"#end
onClick="navMenuClick('$!context/history')"><a
href="$!context/history">History</a></li>
+ <li#if($current_page == 'flowtrigger') class="active"#end
+ onClick="navMenuClick('$!context/flowtrigger')">
+ <a
+ href="$!context/flowtrigger">Flow Trigger Schedule</a></li>
+
#foreach ($viewer in $viewers)
#if (!$viewer.hidden)
<li#if($current_page == $viewer.pluginName) class="active"#end
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/svgflowincludes.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/svgflowincludes.vm
index 2d290a5..488584a 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/svgflowincludes.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/svgflowincludes.vm
@@ -32,6 +32,7 @@
<script type="text/javascript" src="${context}/js/azkaban/util/flow-loader.js"></script>
<script type="text/javascript" src="${context}/js/azkaban/view/job-list.js"></script>
<script type="text/javascript" src="${context}/js/azkaban/model/svg-graph.js"></script>
+<script type="text/javascript" src="${context}/js/azkaban/model/flow-trigger.js"></script>
<script type="text/javascript" src="${context}/js/azkaban/view/svg-graph.js"></script>
<link rel="stylesheet" type="text/css" href="${context}/css/azkaban-graph.css"/>
diff --git a/azkaban-web-server/src/main/tl/flowsummary.tl b/azkaban-web-server/src/main/tl/flowsummary.tl
index 74e40bd..aafdef1 100644
--- a/azkaban-web-server/src/main/tl/flowsummary.tl
+++ b/azkaban-web-server/src/main/tl/flowsummary.tl
@@ -84,6 +84,85 @@
</div>
{/schedule}
- <h3>Last Run Stats</h3>
+ <div class="row">
+ <div class="col-xs-12">
+ <h3>
+ Flow Trigger
+ </h3>
+ {?flowtrigger}
+ <table class="table table-condensed table-bordered">
+ <tbody>
+ <tr>
+ <td class="property-key">Max Wait Mins</td>
+ <td class="property-value-half">{flowtrigger.maxWaitMin}</td>
+ <td class="property-key">Submitted By</td>
+ <td class="property-value-half">{flowtrigger.submitUser}</td>
+ </tr>
+ <tr>
+ <td class="property-key">First Scheduled to Run</td>
+ <td class="property-value-half">{flowtrigger.firstSchedTime}</td>
+
+ <td class="property-key">
+ Cron Expression
+ </td>
+
+ <td class="property-value-half">
+ {?flowtrigger.cronExpression}
+ {flowtrigger.cronExpression}
+ {/flowtrigger.cronExpression}
+ </td>
+
+ </tr>
+ <tr>
+ <td class="property-key">Next Execution Time</td>
+ <td class="property-value-half">{flowtrigger.nextExecTime}</td>
+ <td class="property-key">Has Dependency</td>
+ <td class="property-value-half">
+ {?flowtrigger.dependencies}
+ true
+ {:else}
+ false
+ {/flowtrigger.dependencies}
+ <div class="pull-right">
+ <button type="button" id="showFlowTrigger" data-toggle="modal"
+ class="btn btn-sm btn-info" data-target="#dependencyList">Show Dependency
+ </button>
+ </div>
+
+ <div class="modal fade" id="dependencyList" tabindex="-1"
+ role="dialog"
+ aria-labelledby="dependencyLabel">
+ <div class="modal-dialog" role="document">
+ <div class="modal-content">
+ <div class="modal-header">
+ <button type="button" class="close" data-dismiss="modal" aria-label="Close">
+ <span aria-hidden="true">×</span></button>
+ <h4 class="modal-title" id="dependencyLabel">Dependencies</h4>
+ </div>
+ <div class="modal-body">
+ <pre>{flowtrigger.dependencies}</pre>
+ </div>
+ <div class="modal-footer">
+ <button type="button" class="btn btn-default" data-dismiss="modal">Close
+ </button>
+ </div>
+ </div>
+ </div>
+ </div>
+
+ </td>
+ </tr>
+ </tbody>
+ </table>
+ {:else}
+ <div class="callout callout-default">
+ <h4>None</h4>
+ <p>This flow doesn't have flow trigger.</p>
+ </div>
+ {/flowtrigger}
+
+ </div>
+
+ <h3>Last Run Stats</h3>
+ </div>
</div>
-</div>
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 35ca072..5b00576 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -368,6 +368,31 @@ public class FlowTriggerInstanceLoaderTest {
}
@Test
+ public void testGetTriggerInstancesStartTimeDesc() {
+ final List<TriggerInstance> expected = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ expected.add(this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+ + i * 1000));
+ }
+
+ this.shuffleAndUpload(expected);
+ final Collection<TriggerInstance> actual = this.triggerInstLoader.getTriggerInstances
+ (this.project_id, this.flow_id, 0, 10);
+ expected.sort((o1, o2) -> ((Long) o2.getStartTime()).compareTo(o1.getStartTime()));
+
+ assertTwoTriggerInstanceListsEqual(new ArrayList<>(actual), new ArrayList<>(expected), true,
+ true);
+ }
+
+ @Test
+ public void testGetEmptyTriggerInstancesStartTimeDesc() {
+ final Collection<TriggerInstance> actual = this.triggerInstLoader.getTriggerInstances
+ (this.project_id, this.flow_id, 0, 10);
+ assertThat(actual).isEmpty();
+ }
+
+ @Test
public void testGetRecentlyFinished() throws InterruptedException {
final List<TriggerInstance> all = new ArrayList<>();
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
index 66d4ce7..2c99af7 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
@@ -105,4 +105,20 @@ public class MockFlowTriggerInstanceLoader implements FlowTriggerInstanceLoader
}
return null;
}
+
+ @Override
+ public TriggerInstance getTriggerInstanceByFlowExecId(final int execId) {
+ for (final TriggerInstance inst : this.triggerInstances) {
+ if (inst.getFlowExecId() == execId) {
+ return inst;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Collection<TriggerInstance> getTriggerInstances(final int projectId, final String flowId,
+ final int from, final int length) {
+ throw new UnsupportedOperationException("Not Yet Implemented");
+ }
}
diff --git a/azkaban-web-server/src/web/js/azkaban/model/flow-trigger.js b/azkaban-web-server/src/web/js/azkaban/model/flow-trigger.js
new file mode 100644
index 0000000..1965076
--- /dev/null
+++ b/azkaban-web-server/src/web/js/azkaban/model/flow-trigger.js
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+$.namespace('azkaban');
+
+azkaban.FlowTriggerModel = Backbone.Model.extend({
+ initialize: function () {
+
+ },
+
+ /*
+ * Process and add data from JSON.
+ */
+ addTrigger: function (data) {
+ this.set({'data': data});
+ }
+
+});
azkaban-web-server/src/web/js/azkaban/view/exflow.js 121(+92 -29)
diff --git a/azkaban-web-server/src/web/js/azkaban/view/exflow.js b/azkaban-web-server/src/web/js/azkaban/view/exflow.js
index 3ad9fa7..66b7f55 100644
--- a/azkaban-web-server/src/web/js/azkaban/view/exflow.js
+++ b/azkaban-web-server/src/web/js/azkaban/view/exflow.js
@@ -88,6 +88,7 @@ var flowTabView;
azkaban.FlowTabView = Backbone.View.extend({
events: {
"click #graphViewLink": "handleGraphLinkClick",
+ "click #flowTriggerlistViewLink": "handleFlowTriggerLinkClick",
"click #jobslistViewLink": "handleJobslistLinkClick",
"click #flowLogViewLink": "handleLogLinkClick",
"click #statsViewLink": "handleStatsLinkClick",
@@ -125,21 +126,39 @@ azkaban.FlowTabView = Backbone.View.extend({
$("#jobslistViewLink").removeClass("active");
$("#graphViewLink").addClass("active");
$("#flowLogViewLink").removeClass("active");
+ $("#flowTriggerlistViewLink").removeClass("active");
$("#statsViewLink").removeClass("active");
$("#jobListView").hide();
+ $("#flowTriggerListView").hide();
$("#graphView").show();
$("#flowLogView").hide();
$("#statsView").hide();
},
+ handleFlowTriggerLinkClick: function () {
+ $("#jobslistViewLink").removeClass("active");
+ $("#graphViewLink").removeClass("active");
+ $("#flowLogViewLink").removeClass("active");
+ $("#flowTriggerlistViewLink").addClass("active");
+ $("#statsViewLink").removeClass("active");
+
+ $("#jobListView").hide();
+ $("#flowTriggerListView").show();
+ $("#graphView").hide();
+ $("#flowLogView").hide();
+ $("#statsView").hide();
+ },
+
handleJobslistLinkClick: function () {
$("#graphViewLink").removeClass("active");
$("#jobslistViewLink").addClass("active");
$("#flowLogViewLink").removeClass("active");
+ $("#flowTriggerlistViewLink").removeClass("active");
$("#statsViewLink").removeClass("active");
$("#graphView").hide();
+ $("#flowTriggerListView").hide();
$("#jobListView").show();
$("#flowLogView").hide();
$("#statsView").hide();
@@ -147,11 +166,13 @@ azkaban.FlowTabView = Backbone.View.extend({
handleLogLinkClick: function () {
$("#graphViewLink").removeClass("active");
+ $("#flowTriggerlistViewLink").removeClass("active");
$("#jobslistViewLink").removeClass("active");
$("#flowLogViewLink").addClass("active");
$("#statsViewLink").removeClass("active");
$("#graphView").hide();
+ $("#flowTriggerListView").hide();
$("#jobListView").hide();
$("#flowLogView").show();
$("#statsView").hide();
@@ -159,11 +180,13 @@ azkaban.FlowTabView = Backbone.View.extend({
handleStatsLinkClick: function () {
$("#graphViewLink").removeClass("active");
+ $("#flowTriggerlistViewLink").removeClass("active");
$("#jobslistViewLink").removeClass("active");
$("#flowLogViewLink").removeClass("active");
$("#statsViewLink").addClass("active");
$("#graphView").hide();
+ $("#flowTriggerListView").hide();
$("#jobListView").hide();
$("#flowLogView").hide();
statsView.show();
@@ -392,6 +415,7 @@ azkaban.StatsView = Backbone.View.extend({
var graphModel;
var logModel;
+var flowTriggerModel;
azkaban.LogModel = Backbone.Model.extend({});
var updateStatus = function (updateTime) {
@@ -422,33 +446,33 @@ var updateStatus = function (updateTime) {
}
function updatePastAttempts(data, update) {
- if (!update.pastAttempts) {
- return;
- }
-
- if (data.pastAttempts) {
- for (var i = 0; i < update.pastAttempts.length; ++i) {
- var updatedAttempt = update.pastAttempts[i];
- var found = false;
- for (var j = 0; j < data.pastAttempts.length; ++j) {
- var attempt = data.pastAttempts[j];
- if (attempt.attempt == updatedAttempt.attempt) {
- attempt.startTime = updatedAttempt.startTime;
- attempt.endTime = updatedAttempt.endTime;
- attempt.status = updatedAttempt.status;
- found = true;
- break;
- }
- }
-
- if (!found) {
- data.pastAttempts.push(updatedAttempt);
- }
- }
- }
- else {
- data.pastAttempts = update.pastAttempts;
- }
+ if (!update.pastAttempts) {
+ return;
+ }
+
+ if (data.pastAttempts) {
+ for (var i = 0; i < update.pastAttempts.length; ++i) {
+ var updatedAttempt = update.pastAttempts[i];
+ var found = false;
+ for (var j = 0; j < data.pastAttempts.length; ++j) {
+ var attempt = data.pastAttempts[j];
+ if (attempt.attempt == updatedAttempt.attempt) {
+ attempt.startTime = updatedAttempt.startTime;
+ attempt.endTime = updatedAttempt.endTime;
+ attempt.status = updatedAttempt.status;
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ data.pastAttempts.push(updatedAttempt);
+ }
+ }
+ }
+ else {
+ data.pastAttempts = update.pastAttempts;
+ }
}
var updateGraph = function (data, update) {
@@ -630,6 +654,7 @@ $(function () {
var selected;
graphModel = new azkaban.GraphModel();
+ flowTriggerModel = new azkaban.FlowTriggerModel();
logModel = new azkaban.LogModel();
flowTabView = new azkaban.FlowTabView({
@@ -680,8 +705,26 @@ $(function () {
model: graphModel
});
- var requestURL = contextURL + "/executor";
- var requestData = {"execid": execId, "ajax": "fetchexecflow"};
+ flowTriggerInstanceListView = new azkaban.FlowTriggerInstanceListView({
+ el: $('#flowTriggerListView'),
+ model: flowTriggerModel
+ });
+
+ var requestURL;
+ var requestData;
+ if (execId != "-1" && execId != "-2") {
+ requestURL = contextURL + "/executor";
+ requestData = {"execid": execId, "ajax": "fetchexecflow"};
+ }
+ else {
+ requestURL = contextURL + "/manager";
+ requestData = {
+ "project": projectName,
+ "ajax": "fetchflowgraph",
+ "flow": flowId
+ };
+ }
+
var successHandler = function (data) {
console.log("data fetched");
graphModel.addFlow(data);
@@ -702,6 +745,9 @@ $(function () {
else if (hash == "#stats") {
flowTabView.handleStatsLinkClick();
}
+ else if (hash == "#triggerslist") {
+ flowTabView.handleFlowTriggerLinkClick();
+ }
}
else {
flowTabView.handleGraphLinkClick();
@@ -710,4 +756,21 @@ $(function () {
logUpdaterFunction();
};
ajaxCall(requestURL, requestData, successHandler);
+
+ requestURL = contextURL + "/flowtriggerinstance";
+ if (execId != "-1" && execId != "-2") {
+ requestData = {"execid": execId, "ajax": "fetchTriggerStatus"};
+ }
+ else if (triggerInstanceId != "-1") {
+ requestData = {
+ "triggerinstid": triggerInstanceId,
+ "ajax": "fetchTriggerStatus"
+ };
+ }
+
+ successHandler = function (data) {
+ flowTriggerModel.addTrigger(data)
+ flowTriggerModel.trigger("change:trigger");
+ };
+ ajaxCall(requestURL, requestData, successHandler);
});
azkaban-web-server/src/web/js/azkaban/view/flow.js 102(+91 -11)
diff --git a/azkaban-web-server/src/web/js/azkaban/view/flow.js b/azkaban-web-server/src/web/js/azkaban/view/flow.js
index 6d5b669..014671e 100644
--- a/azkaban-web-server/src/web/js/azkaban/view/flow.js
+++ b/azkaban-web-server/src/web/js/azkaban/view/flow.js
@@ -33,6 +33,7 @@ azkaban.FlowTabView = Backbone.View.extend({
events: {
"click #graphViewLink": "handleGraphLinkClick",
"click #executionsViewLink": "handleExecutionLinkClick",
+ "click #flowtriggersViewLink": "handleFlowTriggerLinkClick",
"click #summaryViewLink": "handleSummaryLinkClick"
},
@@ -53,30 +54,49 @@ azkaban.FlowTabView = Backbone.View.extend({
handleGraphLinkClick: function () {
$("#executionsViewLink").removeClass("active");
$("#graphViewLink").addClass("active");
+ $("#flowtriggersViewLink").removeClass("active");
$('#summaryViewLink').removeClass('active');
- $("#executionsView").hide();
$("#graphView").show();
+ $("#flowtriggerView").hide();
+ $("#executionsView").hide();
$('#summaryView').hide();
},
handleExecutionLinkClick: function () {
$("#graphViewLink").removeClass("active");
$("#executionsViewLink").addClass("active");
+ $("#flowtriggersViewLink").removeClass("active");
$('#summaryViewLink').removeClass('active');
$("#graphView").hide();
+ $("#flowtriggerView").hide();
$("#executionsView").show();
$('#summaryView').hide();
executionModel.trigger("change:view");
},
+ handleFlowTriggerLinkClick: function () {
+ $("#graphViewLink").removeClass("active");
+ $("#executionsViewLink").removeClass("active");
+ $("#flowtriggersViewLink").addClass("active");
+ $('#summaryViewLink').removeClass('active');
+
+ $("#graphView").hide();
+ $("#flowtriggerView").show();
+ $("#executionsView").hide();
+ $('#summaryView').hide();
+ flowTriggerModel.trigger("change:view");
+ },
+
handleSummaryLinkClick: function () {
$('#graphViewLink').removeClass('active');
$('#executionsViewLink').removeClass('active');
+ $("#flowtriggersViewLink").removeClass("active");
$('#summaryViewLink').addClass('active');
$('#graphView').hide();
+ $("#flowtriggerView").hide();
$('#executionsView').hide();
$('#summaryView').show();
},
@@ -101,7 +121,13 @@ azkaban.ExecutionsView = Backbone.View.extend({
render: function (evt) {
console.log("render");
// Render page selections
- var tbody = $("#execTableBody");
+ var content = this.model.get("content");
+ if (content == "flow") {
+ var tbody = $("#execTableBody");
+ }
+ else {
+ var tbody = $("#triggerTableBody");
+ }
tbody.empty();
var executions = this.model.get("executions");
@@ -110,9 +136,16 @@ azkaban.ExecutionsView = Backbone.View.extend({
var tdId = document.createElement("td");
var execA = document.createElement("a");
- $(execA).attr("href", contextURL + "/executor?execid="
- + executions[i].execId);
- $(execA).text(executions[i].execId);
+ if (content == "flow") {
+ $(execA).attr("href", contextURL + "/executor?execid="
+ + executions[i].execId);
+ $(execA).text(executions[i].execId);
+ }
+ else {
+ $(execA).attr("href", contextURL + "/executor?triggerinstanceid="
+ + executions[i].instanceId);
+ $(execA).text(executions[i].instanceId);
+ }
tdId.appendChild(execA);
row.appendChild(tdId);
@@ -132,7 +165,7 @@ azkaban.ExecutionsView = Backbone.View.extend({
var endTime = "-";
var lastTime = executions[i].endTime;
- if (executions[i].endTime != -1) {
+ if (executions[i].endTime != -1 && executions[i].endTime != 0) {
var endDateTime = new Date(executions[i].endTime);
endTime = getDateFormat(endDateTime);
}
@@ -152,7 +185,12 @@ azkaban.ExecutionsView = Backbone.View.extend({
var status = document.createElement("div");
$(status).addClass("status");
$(status).addClass(executions[i].status);
- $(status).text(statusStringMap[executions[i].status]);
+ if (content == "flow") {
+ $(status).text(statusStringMap[executions[i].status]);
+ }
+ else {
+ $(status).text(executions[i].status);
+ }
tdStatus.appendChild(status);
row.appendChild(tdStatus);
@@ -248,7 +286,7 @@ azkaban.ExecutionsView = Backbone.View.extend({
if ($(evt.currentTarget).hasClass("disabled")) {
return;
}
- var page = evt.currentTarget.page;
+ var page = evt.currentTarget.innerText;
this.model.set({"page": page});
},
@@ -264,18 +302,27 @@ azkaban.ExecutionsView = Backbone.View.extend({
handlePageChange: function (evt) {
var page = this.model.get("page") - 1;
var pageSize = this.model.get("pageSize");
- var requestURL = contextURL + "/manager";
+ var content = this.model.get("content");
+ if (content == 'flow') {
+ requestURL = contextURL + "/manager";
+ }
+ else {
+ requestURL = contextURL + "/flowtriggerinstance";
+ }
var model = this.model;
var requestData = {
"project": projectName,
"flow": flowId,
- "ajax": "fetchFlowExecutions",
+ "ajax": content == 'flow' ? "fetchFlowExecutions"
+ : "fetchTriggerInstances",
"start": page * pageSize,
"length": pageSize
};
+
var successHandler = function (data) {
model.set({
+ "content": content,
"executions": data.executions,
"total": data.total
});
@@ -297,6 +344,7 @@ azkaban.SummaryView = Backbone.View.extend({
this.fetchDetails();
this.fetchSchedule();
+ this.fetchFlowTrigger();
this.model.trigger('render');
},
@@ -380,6 +428,22 @@ azkaban.SummaryView = Backbone.View.extend({
$.get(requestURL, requestData, successHandler, 'json');
},
+ fetchFlowTrigger: function () {
+ var requestURL = contextURL + "/flowtrigger"
+ var requestData = {
+ 'ajax': 'fetchTrigger',
+ 'projectId': projectId,
+ 'flowId': flowId
+ };
+ var model = this.model;
+ var view = this;
+ var successHandler = function (data) {
+ model.set({'flowtrigger': data.flowTrigger});
+ model.trigger('render');
+ };
+ $.get(requestURL, requestData, successHandler, 'json');
+ },
+
handleChangeView: function (evt) {
},
@@ -389,6 +453,7 @@ azkaban.SummaryView = Backbone.View.extend({
flowName: flowId,
jobTypes: this.model.get('jobTypes'),
schedule: this.model.get('schedule'),
+ flowtrigger: this.model.get('flowtrigger'),
};
dust.render("flowsummary", data, function (err, out) {
$('#summary-view-content').html(out);
@@ -402,6 +467,9 @@ var mainSvgGraphView;
var executionModel;
azkaban.ExecutionModel = Backbone.Model.extend({});
+var flowTriggerModel;
+azkaban.FlowTriggerModel = Backbone.Model.extend({});
+
var summaryModel;
azkaban.SummaryModel = Backbone.Model.extend({});
@@ -415,11 +483,19 @@ $(function () {
var selected;
// Execution model has to be created before the window switches the tabs.
executionModel = new azkaban.ExecutionModel();
+ executionModel.set("content", "flow");
executionsView = new azkaban.ExecutionsView({
el: $('#executionsView'),
model: executionModel
});
+ flowTriggerModel = new azkaban.ExecutionModel();
+ flowTriggerModel.set("content", "trigger");
+ flowTriggerView = new azkaban.ExecutionsView({
+ el: $('#flowtriggerView'),
+ model: flowTriggerModel
+ });
+
summaryModel = new azkaban.SummaryModel();
summaryView = new azkaban.SummaryView({
el: $('#summaryView'),
@@ -495,7 +571,11 @@ $(function () {
if (hash == "#summary") {
flowTabView.handleSummaryLinkClick();
}
- else if (hash == "#graph") {
+ if (hash == "#flowtriggers") {
+ flowTabView.handleFlowTriggerLinkClick();
+ }
+
+ if (hash == "#graph") {
// Redundant, but we may want to change the default.
selected = "graph";
}
diff --git a/azkaban-web-server/src/web/js/azkaban/view/flow-execution-list.js b/azkaban-web-server/src/web/js/azkaban/view/flow-execution-list.js
index 3e26410..8719821 100644
--- a/azkaban-web-server/src/web/js/azkaban/view/flow-execution-list.js
+++ b/azkaban-web-server/src/web/js/azkaban/view/flow-execution-list.js
@@ -95,7 +95,6 @@ azkaban.ExecutionListView = Backbone.View.extend({
for (var i = 0; i < nodes.length; ++i) {
var node = nodes[i].changedNode ? nodes[i].changedNode : nodes[i];
-
if (node.status == 'READY') {
continue;
}
@@ -113,11 +112,11 @@ azkaban.ExecutionListView = Backbone.View.extend({
var startTimeTd = $(row).find("> td.startTime");
if (node.startTime == -1) {
- $(startTimeTd).text("-");
+ $(startTimeTd).text("-");
}
else {
- var startdate = new Date(node.startTime);
- $(startTimeTd).text(getDateFormat(startdate));
+ var startdate = new Date(node.startTime);
+ $(startTimeTd).text(getDateFormat(startdate));
}
var endTimeTd = $(row).find("> td.endTime");
diff --git a/azkaban-web-server/src/web/js/azkaban/view/flow-trigger-list.js b/azkaban-web-server/src/web/js/azkaban/view/flow-trigger-list.js
new file mode 100644
index 0000000..2dcded2
--- /dev/null
+++ b/azkaban-web-server/src/web/js/azkaban/view/flow-trigger-list.js
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/*
+ * List of executing triggers on executing flow page.
+ */
+
+var flowTriggerInstanceListView;
+
+function killTrigger(id) {
+ var requestData = {"id": id, "ajax": "killRunningTrigger"};
+ var successHandler = function (data) {
+ console.log("cancel clicked");
+ if (data.error) {
+ showDialog("Error", data.error);
+ }
+ else {
+ showDialog("Killed", "Trigger " + id + " has been killed.");
+
+ }
+ };
+ ajaxCall(contextURL + "/flowtriggerinstance", requestData, successHandler);
+};
+
+azkaban.FlowTriggerInstanceListView = Backbone.View.extend({
+ initialize: function (settings) {
+ this.model.bind("change:trigger", this.renderJobs, this);
+ this.model.bind("change:update", this.updateJobs, this);
+ },
+
+ renderJobs: function (evt) {
+ var data = this.model.get("data");
+ var lastTime = data.endTime == -1 ? (new Date()).getTime() : data.endTime;
+ var executingBody = $("#triggerExecutableBody");
+ this.updateJobRow(data.items, executingBody);
+
+ var triggerBody = $("#triggerBody");
+ if (data.triggerId) {
+ this.updateTriggerRow(data, triggerBody);
+ }
+
+ },
+
+ updateJobs: function (evt) {
+ var update = this.model.get("update");
+ var lastTime = update.endTime == -1
+ ? (new Date()).getTime()
+ : update.endTime;
+ var executingBody = $("#triggerExecutableBody");
+
+ if (update.nodes) {
+ this.updateJobRow(update.nodes, executingBody);
+ }
+
+ var data = this.model.get("data");
+ var flowLastTime = data.endTime == -1
+ ? (new Date()).getTime()
+ : data.endTime;
+ var flowStartTime = data.startTime;
+ this.updateProgressBar(data, flowStartTime, flowLastTime);
+ },
+
+ updateTriggerRow: function (data, body) {
+ if (!data) {
+ return;
+ }
+ this.addTriggerRow(data, body);
+ },
+
+ addTriggerRow: function (data, body) {
+ var self = this;
+ var tr = document.createElement("tr");
+ var tdId = document.createElement("td");
+ var tdSubmitter = document.createElement("td");
+ var tdStart = document.createElement("td");
+ var tdEnd = document.createElement("td");
+ var tdElapse = document.createElement("td");
+ var tdProps = document.createElement("td");
+ var buttonProps = document.createElement("BUTTON");
+ var tdStatus = document.createElement("td");
+
+ $(tdProps).append(buttonProps);
+ buttonProps.setAttribute("class", "btn btn-sm btn-info");
+ buttonProps.setAttribute("data-toggle", "modal");
+ buttonProps.setAttribute("data-target", "#dependencyList");
+ buttonProps.innerHTML = "Show";
+
+ $(tr).append(tdId);
+ $(tr).append(tdSubmitter);
+ $(tr).append(tdStart);
+ $(tr).append(tdEnd);
+ $(tr).append(tdElapse);
+ $(tr).append(tdStatus);
+ $(tr).append(tdProps);
+
+ $(tr).addClass("triggerRow");
+ $(tdId).addClass("triggerInstanceId");
+ $(tdSubmitter).addClass("triggerSubmitter");
+ $(tdStart).addClass("startTime");
+ $(tdEnd).addClass("endTime");
+ $(tdElapse).addClass("elapsedTime");
+ $(tdStatus).addClass("status");
+ $(tdProps).addClass("props");
+
+ $(tdId).text(data.triggerId);
+ $(tdSubmitter).text(data.triggerSubmitter);
+
+ var startTime = data.triggerStartTime == 0 ? (new Date()).getTime()
+ : data.triggerStartTime;
+
+ var endTime = data.triggerEndTime == 0 ? (new Date()).getTime()
+ : data.triggerEndTime;
+
+ $(tdStart).text(getDateFormat(new Date(startTime)));
+ $(tdEnd).text(getDateFormat(new Date(endTime)));
+
+ if (data.triggerEndTime == 0) {
+ $(tdElapse).text(
+ getDuration(data.triggerStartTime, (new Date()).getTime()));
+ }
+ else {
+ $(tdElapse).text(
+ getDuration(data.triggerStartTime, data.triggerEndTime));
+ }
+ var status = document.createElement("div");
+ $(status).addClass("status");
+ $(status).addClass(data.triggerStatus);
+ $(status).text(data.triggerStatus);
+ tdStatus.appendChild(status);
+
+ $("#dependencyList").children("div").children("div").children(
+ "div")[1].innerHTML = "<pre>" + data.triggerProps + "</pre>";
+
+ // handle action part
+ if (data.triggerStatus === "RUNNING") {
+ var tdAction = document.createElement("td");
+ var tdActionButton = document.createElement("BUTTON");
+
+ tdActionButton.setAttribute("class", "btn btn-danger btn-sm");
+ tdActionButton.setAttribute("onclick", "killTrigger(\"" + data.triggerId
+ + "\")");
+ tdActionButton.innerHTML = "Kill";
+ $(tdAction).append(tdActionButton);
+ $(tr).append(tdAction);
+ }
+ else {
+ var tdAction = document.createElement("td");
+ $(tdAction).text("-");
+ $(tdAction).addClass("triggerAction");
+ $(tr).append(tdAction);
+ }
+
+ $(body).append(tr);
+ },
+
+ updateJobRow: function (nodes, body) {
+ if (!nodes) {
+ return;
+ }
+
+ nodes.sort(function (a, b) {
+ return a.dependencyStartTime - b.dependencyStartTime;
+ });
+ for (var i = 0; i < nodes.length; ++i) {
+ this.addNodeRow(nodes[i], body);
+ }
+ },
+
+ addNodeRow: function (node, body) {
+ var self = this;
+ var tr = document.createElement("tr");
+ var tdId = document.createElement("td");
+ var tdName = document.createElement("td");
+ var tdType = document.createElement("td");
+ //var tdTimeline = document.createElement("td");
+ var tdStart = document.createElement("td");
+ var tdEnd = document.createElement("td");
+ var tdElapse = document.createElement("td");
+ var tdStatus = document.createElement("td");
+ var tdCancelCause = document.createElement("td");
+ //node.joblistrow = tr;
+ //tr.node = node;
+ var padding = 15 * $(body)[0].level;
+
+ $(tr).append(tdId);
+ $(tr).append(tdName);
+ $(tr).append(tdType);
+ $(tr).append(tdStart);
+ $(tr).append(tdEnd);
+ $(tr).append(tdElapse);
+ $(tr).append(tdStatus);
+ $(tr).append(tdCancelCause);
+ $(tr).addClass("depListRow");
+
+ $(tdName).addClass("depname");
+ $(tdType).addClass("deptype");
+ if (padding) {
+ $(tdName).css("padding-left", padding);
+ }
+ $(tdStart).addClass("startTime");
+ $(tdEnd).addClass("endTime");
+ $(tdElapse).addClass("elapsedTime");
+ $(tdStatus).addClass("statustd");
+
+ $(tdId).text(node.triggerInstanceId);
+ $(tdName).text(node.dependencyName);
+ $(tdType).text(node.dependencyType);
+
+ var status = document.createElement("div");
+ $(status).addClass("status");
+ $(status).addClass(node.dependencyStatus);
+ $(status).text(node.dependencyStatus);
+ tdStatus.appendChild(status);
+
+ $(tdCancelCause).text(node.dependencyCancelCause);
+ var startTime = node.dependencyStartTime == 0 ? (new Date()).getTime()
+ : node.dependencyStartTime;
+
+ var endTime = node.dependencyEndTime == 0 ? (new Date()).getTime()
+ : node.dependencyEndTime;
+
+ $(tdStart).text(getDateFormat(new Date(startTime)));
+ $(tdEnd).text(getDateFormat(new Date(endTime)));
+
+ if (node.dependencyEndTime == 0) {
+ $(tdElapse).text(
+ getDuration(node.dependencyStartTime, (new Date()).getTime()));
+ }
+ else {
+ $(tdElapse).text(
+ getDuration(node.dependencyStartTime, node.dependencyEndTime));
+ }
+
+ $(body).append(tr);
+ }
+});
+