Details
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 64d728e..6daca2c 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -31,7 +31,6 @@ import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManager;
import com.google.common.io.Files;
import java.io.File;
-import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -136,11 +135,19 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
@Override
public Collection<TriggerInstance> getIncompleteTriggerInstances() {
- Collection<TriggerInstance> unfinished = Collections.EMPTY_LIST;
+ final Collection<TriggerInstance> unfinished = new ArrayList<>();
try {
- unfinished = this.dbOperator
+ final Collection<TriggerInstance> triggerInstances = this.dbOperator
.query(SELECT_ALL_PENDING_EXECUTIONS, new TriggerInstanceHandler());
+ // select incomplete trigger instances
+ for (final TriggerInstance triggerInst : triggerInstances) {
+ if (!Status.isDone(triggerInst.getStatus()) || (triggerInst.getStatus() == Status.SUCCEEDED
+ && triggerInst.getFlowExecId() == Constants.UNASSIGNED_EXEC_ID)) {
+ unfinished.add(triggerInst);
+ }
+ }
+
// backfilling flow trigger for unfinished trigger instances
// dedup flow config id with a set to avoid downloading/parsing same flow file multiple times
@@ -165,7 +172,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
} else {
logger.error("Unable to find flow file for " + flowConfigID);
}
- } catch (final IOException ex) {
+ } catch (final Exception ex) {
logger.error("error in getting flow file", ex);
} finally {
FlowLoaderUtils.cleanUpDir(tempDir);
@@ -181,7 +188,6 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
} catch (final SQLException ex) {
handleSQLException(ex);
}
-
return unfinished;
}
@@ -314,7 +320,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
} else {
logger.error("Unable to find flow file for " + triggerInstanceId);
}
- } catch (final IOException ex) {
+ } catch (final Exception ex) {
logger.error("error in getting flow file", ex);
} finally {
FlowLoaderUtils.cleanUpDir(tempDir);
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index de98efb..cb6dd5b 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -248,13 +248,34 @@ public class FlowTriggerService {
public void recoverIncompleteTriggerInstances() {
final Collection<TriggerInstance> unfinishedTriggerInstances = this.flowTriggerInstanceLoader
.getIncompleteTriggerInstances();
- //todo chengren311: what if flow trigger is not found?
for (final TriggerInstance triggerInstance : unfinishedTriggerInstances) {
if (triggerInstance.getFlowTrigger() != null) {
recover(triggerInstance);
} else {
- logger.error(String.format("cannot recover the trigger instance %s, flow trigger is null ",
- triggerInstance.getId()));
+ logger.error(String.format("cannot recover the trigger instance %s, flow trigger is null,"
+ + " cancelling it ", triggerInstance.getId()));
+
+ //finalize unrecoverable trigger instances
+ // the following situation would cause trigger instances unrecoverable:
+ // 1. project A with flow A associated with flow trigger A is uploaded
+ // 2. flow trigger A starts to run
+ // 3. project A with flow B without any flow trigger is uploaded
+ // 4. web server restarts
+ // in this case, flow trigger instance of flow trigger A will be equipped with latest
+ // project, thus failing to find the flow trigger since new project doesn't contain flow
+ // trigger at all
+ if (isDoneButFlowNotExecuted(triggerInstance)) {
+ triggerInstance.setFlowExecId(Constants.FAILED_EXEC_ID);
+ this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInstance);
+ } else {
+ for (final DependencyInstance depInst : triggerInstance.getDepInstances()) {
+ if (!Status.isDone(depInst.getStatus())) {
+ processStatusAndCancelCauseUpdate(depInst, Status.CANCELLED,
+ CancellationCause.FAILURE);
+ this.triggerProcessor.processTermination(depInst.getTriggerInstance());
+ }
+ }
+ }
}
}
}
@@ -516,6 +537,7 @@ public class FlowTriggerService {
this.singleThreadExecutorService.shutdownNow(); // Cancel currently executing tasks
this.multiThreadsExecutorService.shutdown();
this.multiThreadsExecutorService.shutdownNow();
+ this.triggerProcessor.shutdown();
this.triggerPluginManager.shutdown();
}
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
index 8825987..bf0e72f 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
@@ -38,8 +38,9 @@ public class TriggerInstance {
//todo chengren311: convert it to builder
public TriggerInstance(final String id, final FlowTrigger flowTrigger, final String flowId,
- final int flowVersion, final String submitUser, final List<DependencyInstance> depInstances,
- final int flowExecId, final Project project) {
+ final int flowVersion, final String submitUser, final List<DependencyInstance>
+ depInstances, final int flowExecId, final Project project) {
+
this.depInstances = ImmutableList.copyOf(depInstances);
this.id = id;
this.flowTrigger = flowTrigger;
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
index c584dc7..f17c499 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -60,6 +60,11 @@ public class TriggerInstanceProcessor {
this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
}
+ public void shutdown() {
+ this.executorService.shutdown();
+ this.executorService.shutdownNow();
+ }
+
private void executeFlowAndUpdateExecID(final TriggerInstance triggerInst) {
try {
final Project project = triggerInst.getProject();