azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
index c731146..52e7074 100644
--- a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
+++ b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package azkaban.utils;
 
 /**
diff --git a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
index 2b19eed..821d989 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package azkaban.utils;
 
 import java.util.Collections;
@@ -8,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.log4j.Logger;
+
 /**
  * A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress
  * tasks as well as other interesting statistics.
@@ -20,6 +37,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class TrackingThreadPool extends ThreadPoolExecutor {
 
+  private static Logger logger = Logger.getLogger(TrackingThreadPool.class);
+
   private final Map<Runnable, Boolean> inProgress =
       new ConcurrentHashMap<Runnable, Boolean>();
   private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
@@ -39,17 +58,20 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
     }
   }
 
+  @Override
   protected void beforeExecute(Thread t, Runnable r) {
     try {
       executingListener.beforeExecute(r);
-    } finally {
+    } catch (Throwable e) {
       // to ensure the listener doesn't cause any issues
+      logger.warn("Listener threw exception", e);
     }
     super.beforeExecute(t, r);
     inProgress.put(r, Boolean.TRUE);
     startTime.set(new Long(System.currentTimeMillis()));
   }
 
+  @Override
   protected void afterExecute(Runnable r, Throwable t) {
     long time = System.currentTimeMillis() - startTime.get().longValue();
     synchronized (this) {
@@ -60,8 +82,9 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
     super.afterExecute(r, t);
     try {
       executingListener.afterExecute(r);
-    } finally {
+    } catch (Throwable e) {
       // to ensure the listener doesn't cause any issues
+      logger.warn("Listener threw exception", e);
     }
   }
 
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 0f4c840..7c08c12 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -78,6 +78,8 @@ public class FlowRunnerManager implements EventListener,
   private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
 
   private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
+  private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
+
   private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
       new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
   private Map<Future<?>, Integer> submittedFlows =
@@ -93,7 +95,7 @@ public class FlowRunnerManager implements EventListener,
   private TrackingThreadPool executorService;
 
   private CleanerThread cleanerThread;
-  private int numJobThreadPerFlow = 10;
+  private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
 
   private ExecutorLoader executorLoader;
   private ProjectLoader projectLoader;
@@ -145,7 +147,7 @@ public class FlowRunnerManager implements EventListener,
     numThreads =
         props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
     numJobThreadPerFlow =
-        props.getInt(FLOW_NUM_JOB_THREADS, numJobThreadPerFlow);
+        props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
     executorService = createExecutorService(numThreads);
 
     this.executorLoader = executorLoader;
@@ -588,7 +590,6 @@ public class FlowRunnerManager implements EventListener,
       logger.info("Flow " + flow.getExecutionId()
           + " is finished. Adding it to recently finished flows list.");
       runningFlows.remove(flow.getExecutionId());
-      submittedFlows.remove(flow.getExecutionId());
     }
   }
 
@@ -758,7 +759,7 @@ public class FlowRunnerManager implements EventListener,
         new ArrayList<Integer>(inProgressTasks.size());
 
     for (Runnable task : inProgressTasks) {
-      Integer execId = submittedFlows.get(task);
+      Integer execId = submittedFlows.get((Future<?>) task);
       if (execId != null) {
         runningFlowIds.add(execId);
       } else {