Details
diff --git a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
index c731146..52e7074 100644
--- a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
+++ b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
package azkaban.utils;
/**
diff --git a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
index 2b19eed..821d989 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
package azkaban.utils;
import java.util.Collections;
@@ -8,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
/**
* A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress
* tasks as well as other interesting statistics.
@@ -20,6 +37,8 @@ import java.util.concurrent.TimeUnit;
*/
public class TrackingThreadPool extends ThreadPoolExecutor {
+ private static Logger logger = Logger.getLogger(TrackingThreadPool.class);
+
private final Map<Runnable, Boolean> inProgress =
new ConcurrentHashMap<Runnable, Boolean>();
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
@@ -39,17 +58,20 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
}
}
+ @Override
protected void beforeExecute(Thread t, Runnable r) {
try {
executingListener.beforeExecute(r);
- } finally {
+ } catch (Throwable e) {
// to ensure the listener doesn't cause any issues
+ logger.warn("Listener threw exception", e);
}
super.beforeExecute(t, r);
inProgress.put(r, Boolean.TRUE);
startTime.set(new Long(System.currentTimeMillis()));
}
+ @Override
protected void afterExecute(Runnable r, Throwable t) {
long time = System.currentTimeMillis() - startTime.get().longValue();
synchronized (this) {
@@ -60,8 +82,9 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
super.afterExecute(r, t);
try {
executingListener.afterExecute(r);
- } finally {
+ } catch (Throwable e) {
// to ensure the listener doesn't cause any issues
+ logger.warn("Listener threw exception", e);
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 0f4c840..7c08c12 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -78,6 +78,8 @@ public class FlowRunnerManager implements EventListener,
private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
+ private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
+
private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
private Map<Future<?>, Integer> submittedFlows =
@@ -93,7 +95,7 @@ public class FlowRunnerManager implements EventListener,
private TrackingThreadPool executorService;
private CleanerThread cleanerThread;
- private int numJobThreadPerFlow = 10;
+ private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
private ExecutorLoader executorLoader;
private ProjectLoader projectLoader;
@@ -145,7 +147,7 @@ public class FlowRunnerManager implements EventListener,
numThreads =
props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
numJobThreadPerFlow =
- props.getInt(FLOW_NUM_JOB_THREADS, numJobThreadPerFlow);
+ props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
executorService = createExecutorService(numThreads);
this.executorLoader = executorLoader;
@@ -588,7 +590,6 @@ public class FlowRunnerManager implements EventListener,
logger.info("Flow " + flow.getExecutionId()
+ " is finished. Adding it to recently finished flows list.");
runningFlows.remove(flow.getExecutionId());
- submittedFlows.remove(flow.getExecutionId());
}
}
@@ -758,7 +759,7 @@ public class FlowRunnerManager implements EventListener,
new ArrayList<Integer>(inProgressTasks.size());
for (Runnable task : inProgressTasks) {
- Integer execId = submittedFlows.get(task);
+ Integer execId = submittedFlows.get((Future<?>) task);
if (execId != null) {
runningFlowIds.add(execId);
} else {