azkaban-aplcache

Revert "added license header and changes based on code review

7/14/2014 7:11:53 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
index 52e7074..c731146 100644
--- a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
+++ b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
@@ -1,18 +1,3 @@
-/*
- * Copyright 2012 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
 package azkaban.utils;
 
 /**
diff --git a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
index 821d989..2b19eed 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
@@ -1,18 +1,3 @@
-/*
- * Copyright 2012 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
 package azkaban.utils;
 
 import java.util.Collections;
@@ -23,8 +8,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.log4j.Logger;
-
 /**
  * A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress
  * tasks as well as other interesting statistics.
@@ -37,8 +20,6 @@ import org.apache.log4j.Logger;
  */
 public class TrackingThreadPool extends ThreadPoolExecutor {
 
-  private static Logger logger = Logger.getLogger(TrackingThreadPool.class);
-
   private final Map<Runnable, Boolean> inProgress =
       new ConcurrentHashMap<Runnable, Boolean>();
   private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
@@ -58,20 +39,17 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
     }
   }
 
-  @Override
   protected void beforeExecute(Thread t, Runnable r) {
     try {
       executingListener.beforeExecute(r);
-    } catch (Throwable e) {
+    } finally {
       // to ensure the listener doesn't cause any issues
-      logger.warn("Listener threw exception", e);
     }
     super.beforeExecute(t, r);
     inProgress.put(r, Boolean.TRUE);
     startTime.set(new Long(System.currentTimeMillis()));
   }
 
-  @Override
   protected void afterExecute(Runnable r, Throwable t) {
     long time = System.currentTimeMillis() - startTime.get().longValue();
     synchronized (this) {
@@ -82,9 +60,8 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
     super.afterExecute(r, t);
     try {
       executingListener.afterExecute(r);
-    } catch (Throwable e) {
+    } finally {
       // to ensure the listener doesn't cause any issues
-      logger.warn("Listener threw exception", e);
     }
   }
 
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 7c08c12..0f4c840 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -78,8 +78,6 @@ public class FlowRunnerManager implements EventListener,
   private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
 
   private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
-  private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
-
   private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
       new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
   private Map<Future<?>, Integer> submittedFlows =
@@ -95,7 +93,7 @@ public class FlowRunnerManager implements EventListener,
   private TrackingThreadPool executorService;
 
   private CleanerThread cleanerThread;
-  private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
+  private int numJobThreadPerFlow = 10;
 
   private ExecutorLoader executorLoader;
   private ProjectLoader projectLoader;
@@ -147,7 +145,7 @@ public class FlowRunnerManager implements EventListener,
     numThreads =
         props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
     numJobThreadPerFlow =
-        props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
+        props.getInt(FLOW_NUM_JOB_THREADS, numJobThreadPerFlow);
     executorService = createExecutorService(numThreads);
 
     this.executorLoader = executorLoader;
@@ -590,6 +588,7 @@ public class FlowRunnerManager implements EventListener,
       logger.info("Flow " + flow.getExecutionId()
           + " is finished. Adding it to recently finished flows list.");
       runningFlows.remove(flow.getExecutionId());
+      submittedFlows.remove(flow.getExecutionId());
     }
   }
 
@@ -759,7 +758,7 @@ public class FlowRunnerManager implements EventListener,
         new ArrayList<Integer>(inProgressTasks.size());
 
     for (Runnable task : inProgressTasks) {
-      Integer execId = submittedFlows.get((Future<?>) task);
+      Integer execId = submittedFlows.get(task);
       if (execId != null) {
         runningFlowIds.add(execId);
       } else {