Details
diff --git a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
index 52e7074..c731146 100644
--- a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
+++ b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
@@ -1,18 +1,3 @@
-/*
- * Copyright 2012 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
package azkaban.utils;
/**
diff --git a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
index 821d989..2b19eed 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
@@ -1,18 +1,3 @@
-/*
- * Copyright 2012 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
package azkaban.utils;
import java.util.Collections;
@@ -23,8 +8,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.log4j.Logger;
-
/**
* A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress
* tasks as well as other interesting statistics.
@@ -37,8 +20,6 @@ import org.apache.log4j.Logger;
*/
public class TrackingThreadPool extends ThreadPoolExecutor {
- private static Logger logger = Logger.getLogger(TrackingThreadPool.class);
-
private final Map<Runnable, Boolean> inProgress =
new ConcurrentHashMap<Runnable, Boolean>();
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
@@ -58,20 +39,17 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
}
}
- @Override
protected void beforeExecute(Thread t, Runnable r) {
try {
executingListener.beforeExecute(r);
- } catch (Throwable e) {
+ } finally {
// to ensure the listener doesn't cause any issues
- logger.warn("Listener threw exception", e);
}
super.beforeExecute(t, r);
inProgress.put(r, Boolean.TRUE);
startTime.set(new Long(System.currentTimeMillis()));
}
- @Override
protected void afterExecute(Runnable r, Throwable t) {
long time = System.currentTimeMillis() - startTime.get().longValue();
synchronized (this) {
@@ -82,9 +60,8 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
super.afterExecute(r, t);
try {
executingListener.afterExecute(r);
- } catch (Throwable e) {
+ } finally {
// to ensure the listener doesn't cause any issues
- logger.warn("Listener threw exception", e);
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 7c08c12..0f4c840 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -78,8 +78,6 @@ public class FlowRunnerManager implements EventListener,
private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
- private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
-
private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
private Map<Future<?>, Integer> submittedFlows =
@@ -95,7 +93,7 @@ public class FlowRunnerManager implements EventListener,
private TrackingThreadPool executorService;
private CleanerThread cleanerThread;
- private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
+ private int numJobThreadPerFlow = 10;
private ExecutorLoader executorLoader;
private ProjectLoader projectLoader;
@@ -147,7 +145,7 @@ public class FlowRunnerManager implements EventListener,
numThreads =
props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
numJobThreadPerFlow =
- props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
+ props.getInt(FLOW_NUM_JOB_THREADS, numJobThreadPerFlow);
executorService = createExecutorService(numThreads);
this.executorLoader = executorLoader;
@@ -590,6 +588,7 @@ public class FlowRunnerManager implements EventListener,
logger.info("Flow " + flow.getExecutionId()
+ " is finished. Adding it to recently finished flows list.");
runningFlows.remove(flow.getExecutionId());
+ submittedFlows.remove(flow.getExecutionId());
}
}
@@ -759,7 +758,7 @@ public class FlowRunnerManager implements EventListener,
new ArrayList<Integer>(inProgressTasks.size());
for (Runnable task : inProgressTasks) {
- Integer execId = submittedFlows.get((Future<?>) task);
+ Integer execId = submittedFlows.get(task);
if (execId != null) {
runningFlowIds.add(execId);
} else {