Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index 2a26c1a..9f942e6 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -16,6 +16,9 @@
package azkaban.executor;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
public class ExecutionReference {
private final int execId;
@@ -30,11 +33,7 @@ public class ExecutionReference {
this.execId = execId;
}
- public ExecutionReference(final int execId, final Executor executor) {
- if (executor == null) {
- throw new IllegalArgumentException(String.format(
- "Executor cannot be null for exec id: %d ExecutionReference", execId));
- }
+ public ExecutionReference(final int execId, @Nullable final Executor executor) {
this.execId = execId;
this.executor = executor;
}
@@ -59,14 +58,6 @@ public class ExecutionReference {
return this.execId;
}
- public String getHost() {
- return this.executor.getHost();
- }
-
- public int getPort() {
- return this.executor.getPort();
- }
-
public int getNumErrors() {
return this.numErrors;
}
@@ -75,11 +66,11 @@ public class ExecutionReference {
this.numErrors = numErrors;
}
- public Executor getExecutor() {
- return this.executor;
+ public Optional<Executor> getExecutor() {
+ return Optional.ofNullable(this.executor);
}
- public void setExecutor(final Executor executor) {
+ public void setExecutor(final @Nullable Executor executor) {
this.executor = executor;
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
index b3ad9c7..b536555 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiGateway.java
@@ -46,14 +46,16 @@ public class ExecutorApiGateway {
Map<String, Object> callWithReference(final ExecutionReference ref, final String action,
final Pair<String, String>... params) throws ExecutorManagerException {
- return callWithExecutionId(ref.getHost(), ref.getPort(), action, ref.getExecId(),
+ final Executor executor = ref.getExecutor().get();
+ return callWithExecutionId(executor.getHost(), executor.getPort(), action, ref.getExecId(),
null, params);
}
Map<String, Object> callWithReferenceByUser(final ExecutionReference ref,
final String action, final String user, final Pair<String, String>... params)
throws ExecutorManagerException {
- return callWithExecutionId(ref.getHost(), ref.getPort(), action,
+ final Executor executor = ref.getExecutor().get();
+ return callWithExecutionId(executor.getHost(), executor.getPort(), action,
ref.getExecId(), user, params);
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 5ec1d96..8f2976f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -54,6 +54,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -396,10 +397,6 @@ public class ExecutorManager extends EventHandler implements
return this.lastThreadCheckTime;
}
- public long getLastCleanerThreadCheckTime() {
- return this.lastCleanerThreadCheckTime;
- }
-
@Override
public Collection<Executor> getAllActiveExecutors() {
return Collections.unmodifiableCollection(this.activeExecutors);
@@ -441,7 +438,10 @@ public class ExecutorManager extends EventHandler implements
for (final Pair<ExecutionReference, ExecutableFlow> running : this.runningFlows
.values()) {
final ExecutionReference ref = running.getFirst();
- ports.add(ref.getHost() + ":" + ref.getPort());
+ if (ref.getExecutor().isPresent()) {
+ final Executor executor = ref.getExecutor().get();
+ ports.add(executor.getHost() + ":" + executor.getPort());
+ }
}
return ports;
}
@@ -508,9 +508,9 @@ public class ExecutorManager extends EventHandler implements
* @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
*/
@Override
- public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+ public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor()
throws IOException {
- final List<Pair<ExecutableFlow, Executor>> flows =
+ final List<Pair<ExecutableFlow, Optional<Executor>>> flows =
new ArrayList<>();
getActiveFlowsWithExecutorHelper(flows, this.queuedFlows.getAllEntries());
getActiveFlowsWithExecutorHelper(flows, this.runningFlows.values());
@@ -519,7 +519,7 @@ public class ExecutorManager extends EventHandler implements
/* Helper method for getActiveFlowsWithExecutor */
private void getActiveFlowsWithExecutorHelper(
- final List<Pair<ExecutableFlow, Executor>> flows,
+ final List<Pair<ExecutableFlow, Optional<Executor>>> flows,
final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
flows.add(new Pair<>(ref.getSecond(), ref
@@ -1445,15 +1445,15 @@ public class ExecutorManager extends EventHandler implements
}
/* Group Executable flow by Executors to reduce number of REST calls */
- private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
- final HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+ private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
+ final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
new HashMap<>();
for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningFlows
.values()) {
final ExecutionReference ref = runningFlow.getFirst();
final ExecutableFlow flow = runningFlow.getSecond();
- final Executor executor = ref.getExecutor();
+ final Optional<Executor> executor = ref.getExecutor();
// We can set the next check time to prevent the checking of certain
// flows.
@@ -1553,20 +1553,27 @@ public class ExecutorManager extends EventHandler implements
ExecutorManager.this.lastThreadCheckTime = System.currentTimeMillis();
ExecutorManager.this.updaterStage = "Starting update all flows.";
- final Map<Executor, List<ExecutableFlow>> exFlowMap =
+ final Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
getFlowToExecutorMap();
- final ArrayList<ExecutableFlow> finishedFlows =
- new ArrayList<>();
final ArrayList<ExecutableFlow> finalizeFlows =
new ArrayList<>();
if (exFlowMap.size() > 0) {
- for (final Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+ for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap
.entrySet()) {
final List<Long> updateTimesList = new ArrayList<>();
final List<Integer> executionIdsList = new ArrayList<>();
- final Executor executor = entry.getKey();
+ final Optional<Executor> executorOption = entry.getKey();
+ if (!executorOption.isPresent()) {
+ for (final ExecutableFlow flow : entry.getValue()) {
+ logger.warn("Finalizing execution " + flow.getExecutionId()
+ + ". Executor id of this execution doesn't exist");
+ finalizeFlows.add(flow);
+ }
+ continue;
+ }
+ final Executor executor = executorOption.get();
ExecutorManager.this.updaterStage =
"Starting update flows on " + executor.getHost() + ":"
@@ -1609,7 +1616,7 @@ public class ExecutorManager extends EventHandler implements
+ this.errorThreshold);
ref.setNumErrors(++numErrors);
} else {
- logger.error("Evicting flow " + flow.getExecutionId()
+ logger.warn("Evicting execution " + flow.getExecutionId()
+ ". The executor is unresponsive.");
// TODO should send out an unresponsive email here.
finalizeFlows.add(pair.getSecond());
@@ -1630,7 +1637,6 @@ public class ExecutorManager extends EventHandler implements
ExecutorManager.this.updaterStage = "Updated flow " + flow.getExecutionId();
if (isFinished(flow)) {
- finishedFlows.add(flow);
finalizeFlows.add(flow);
}
} catch (final ExecutorManagerException e) {
@@ -1638,7 +1644,7 @@ public class ExecutorManager extends EventHandler implements
logger.error(e);
if (flow != null) {
- logger.error("Finalizing flow " + flow.getExecutionId());
+ logger.warn("Finalizing execution " + flow.getExecutionId());
finalizeFlows.add(flow);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index c939d6f..2c2cd7a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -25,6 +25,7 @@ import java.lang.Thread.State;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
public interface ExecutorManagerAdapter {
@@ -44,7 +45,7 @@ public interface ExecutorManagerAdapter {
* Note, returns empty list if there isn't any running or queued flows
* </pre>
*/
- public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+ public List<Pair<ExecutableFlow, Optional<Executor>>> getActiveFlowsWithExecutor()
throws IOException;
public List<ExecutableFlow> getRecentlyFinishedFlows();
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index 93796a8..938b5d4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -16,8 +16,8 @@
package azkaban.executor;
-import azkaban.db.EncodingType;
import azkaban.db.DatabaseOperator;
+import azkaban.db.EncodingType;
import azkaban.utils.GZIPUtils;
import azkaban.utils.Pair;
import java.io.IOException;
@@ -29,10 +29,13 @@ import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
@Singleton
public class FetchActiveFlowDao {
+ private static final Logger logger = Logger.getLogger(FetchActiveFlowDao.class);
+
private final DatabaseOperator dbOperator;
@Inject
@@ -56,9 +59,9 @@ public class FetchActiveFlowDao {
// Select running and executor assigned flows
private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
"SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
- + "et.port port, et.id executorId, et.active executorStatus"
+ + "et.port port, ex.executor_id executorId, et.active executorStatus"
+ " FROM execution_flows ex"
- + " INNER JOIN "
+ + " LEFT JOIN "
+ " executors et ON ex.executor_id = et.id"
+ " Where ex.status NOT IN ("
+ Status.SUCCEEDED.getNumVal() + ", "
@@ -91,8 +94,13 @@ public class FetchActiveFlowDao {
final ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(
GZIPUtils.transformBytesToObject(data, encType));
-
- final Executor executor = new Executor(executorId, host, port, executorStatus);
+ final Executor executor;
+ if (host == null) {
+ logger.warn("Executor id " + executorId + " (on execution " + id + ") wasn't found");
+ executor = null;
+ } else {
+ executor = new Executor(executorId, host, port, executorStatus);
+ }
final ExecutionReference ref = new ExecutionReference(id, executor);
execFlows.put(id, new Pair<>(ref, exFlow));
} catch (final IOException e) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index e833bd6..31ff91c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -280,7 +280,11 @@ public class ExecutionFlowDaoTest {
this.fetchActiveFlowDao.fetchActiveFlows();
assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isTrue();
- assertThat(activeFlows1.containsKey(flow2.getExecutionId())).isFalse();
+ assertThat(activeFlows1.get(flow1.getExecutionId()).getFirst().getExecutor().isPresent())
+ .isTrue();
+ assertThat(activeFlows1.containsKey(flow2.getExecutionId())).isTrue();
+ assertThat(activeFlows1.get(flow2.getExecutionId()).getFirst().getExecutor().isPresent())
+ .isFalse();
final ExecutableFlow flow1Result =
activeFlows1.get(flow1.getExecutionId()).getSecond();
assertTwoFlowSame(flow1Result, flow1);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 5ac23b4..67467fa 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.After;
@@ -357,12 +358,12 @@ public class ExecutorManagerTest {
@Test
public void testFetchActiveFlowWithExecutor() throws Exception {
testSetUpForRunningFlows();
- final List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
+ final List<Pair<ExecutableFlow, Optional<Executor>>> activeFlowsWithExecutor =
this.manager.getActiveFlowsWithExecutor();
Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow1,
- this.manager.fetchExecutor(this.flow1.getExecutionId()))));
+ Optional.ofNullable(this.manager.fetchExecutor(this.flow1.getExecutionId())))));
Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow2,
- this.manager.fetchExecutor(this.flow2.getExecutionId()))));
+ Optional.ofNullable(this.manager.fetchExecutor(this.flow2.getExecutionId())))));
}
@Test
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 6551df9..61a8325 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -57,6 +57,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -357,7 +358,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
newPage(req, resp, session,
"azkaban/webapp/servlet/velocity/executionspage.vm");
- final List<Pair<ExecutableFlow, Executor>> runningFlows =
+ final List<Pair<ExecutableFlow, Optional<Executor>>> runningFlows =
this.executorManager.getActiveFlowsWithExecutor();
page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
index 385fc3b..2fbfefb 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -93,8 +93,8 @@
<a href="${context}/executor?execid=${flow.getFirst().executionId}">${flow.getFirst().executionId}</a>
</td>
<td>
- #if (${flow.getSecond()})
- ${flow.getSecond().getId()}
+ #if (${flow.getSecond().isPresent()})
+ ${flow.getSecond().get().getId()}
#else
-
#end