azkaban-aplcache

Revert "Issue #280: flowRunnerManager JMX bean : the NumExecutingFlows

7/14/2014 5:16:24 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 8d230ba..0d98a70 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -34,9 +34,10 @@ import java.util.Map;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
+
 import org.joda.time.DateTime;
 
 import azkaban.database.AbstractJdbcLoader;
@@ -167,11 +168,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       List<ExecutableFlow> properties =
           runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler,
               id);
-      if (properties.isEmpty()) {
-        return null;
-      } else {
-        return properties.get(0);
-      }
+      return properties.get(0);
     } catch (SQLException e) {
       throw new ExecutorManagerException("Error fetching flow id " + id, e);
     }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index f73a30a..123f9e4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -60,7 +60,7 @@ import azkaban.utils.SwapQueue;
 
 /**
  * Class that handles the running of a ExecutableFlow DAG
- * 
+ *
  */
 public class FlowRunner extends EventHandler implements Runnable {
   private static final Layout DEFAULT_LAYOUT = new PatternLayout(
@@ -122,7 +122,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Constructor. This will create its own ExecutorService for thread pools
-   * 
+   *
    * @param flow
    * @param executorLoader
    * @param projectLoader
@@ -138,7 +138,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   /**
    * Constructor. If executorService is null, then it will create it's own for
    * thread pools.
-   * 
+   *
    * @param flow
    * @param executorLoader
    * @param projectLoader
@@ -356,7 +356,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Main method that executes the jobs.
-   * 
+   *
    * @throws Exception
    */
   private void runFlow() throws Exception {
@@ -725,7 +725,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   /**
    * Determines what the state of the next node should be. Returns null if the
    * node should not be run.
-   * 
+   *
    * @param node
    * @return
    */
@@ -1094,8 +1094,4 @@ public class FlowRunner extends EventHandler implements Runnable {
   public int getNumRunningJobs() {
     return activeJobRunners.size();
   }
-
-  public int getExecutionId() {
-    return execId;
-  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 0f4c840..a044f70 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -27,16 +27,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
+import azkaban.project.ProjectLoader;
 import azkaban.event.Event;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
@@ -48,28 +48,18 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
-import azkaban.project.ProjectLoader;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
-import azkaban.utils.ThreadPoolExecutingListener;
-import azkaban.utils.TrackingThreadPool;
 
 /**
  * Execution manager for the server side execution.
- * 
+ *
  */
-public class FlowRunnerManager implements EventListener,
-    ThreadPoolExecutingListener {
-  private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE =
-      "executor.use.bounded.threadpool.queue";
-  private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE =
-      "executor.threadpool.workqueue.size";
-  private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads";
-  private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads";
+public class FlowRunnerManager implements EventListener {
   private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
   private File executionDirectory;
   private File projectDirectory;
@@ -80,18 +70,16 @@ public class FlowRunnerManager implements EventListener,
   private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
   private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
       new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
-  private Map<Future<?>, Integer> submittedFlows =
-      new ConcurrentHashMap<Future<?>, Integer>();
   private Map<Integer, FlowRunner> runningFlows =
       new ConcurrentHashMap<Integer, FlowRunner>();
   private Map<Integer, ExecutableFlow> recentlyFinishedFlows =
       new ConcurrentHashMap<Integer, ExecutableFlow>();
-
+  private LinkedBlockingQueue<FlowRunner> flowQueue =
+      new LinkedBlockingQueue<FlowRunner>();
   private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
-  private int threadPoolQueueSize = -1;
-
-  private TrackingThreadPool executorService;
 
+  private ExecutorService executorService;
+  private SubmitterThread submitterThread;
   private CleanerThread cleanerThread;
   private int numJobThreadPerFlow = 10;
 
@@ -104,6 +92,7 @@ public class FlowRunnerManager implements EventListener,
 
   private final Props azkabanProps;
 
+  private long lastSubmitterThreadCheckTime = -1;
   private long lastCleanerThreadCheckTime = -1;
   private long executionDirRetention = 1 * 24 * 60 * 60 * 1000;
 
@@ -143,10 +132,10 @@ public class FlowRunnerManager implements EventListener,
 
     // azkaban.temp.dir
     numThreads =
-        props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
+        props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
     numJobThreadPerFlow =
-        props.getInt(FLOW_NUM_JOB_THREADS, numJobThreadPerFlow);
-    executorService = createExecutorService(numThreads);
+        props.getInt("flow.num.job.threads", numJobThreadPerFlow);
+    executorService = Executors.newFixedThreadPool(numThreads);
 
     this.executorLoader = executorLoader;
     this.projectLoader = projectLoader;
@@ -157,6 +146,9 @@ public class FlowRunnerManager implements EventListener,
     this.validateProxyUser =
         azkabanProps.getBoolean("proxy.user.lock.down", false);
 
+    submitterThread = new SubmitterThread(flowQueue);
+    submitterThread.start();
+
     cleanerThread = new CleanerThread();
     cleanerThread.start();
 
@@ -173,32 +165,6 @@ public class FlowRunnerManager implements EventListener,
             parentClassLoader);
   }
 
-  private TrackingThreadPool createExecutorService(int nThreads) {
-    boolean useNewThreadPool =
-        azkabanProps.getBoolean(EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE, false);
-    logger.info("useNewThreadPool: " + useNewThreadPool);
-
-    if (useNewThreadPool) {
-      threadPoolQueueSize =
-          azkabanProps.getInt(EXECUTOR_THREADPOOL_WORKQUEUE_SIZE, nThreads);
-      logger.info("workQueueSize: " + threadPoolQueueSize);
-
-      // using a bounded queue for the work queue. The default rejection policy
-      // {@ThreadPoolExecutor.AbortPolicy} is used
-      TrackingThreadPool executor =
-          new TrackingThreadPool(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
-              new LinkedBlockingQueue<Runnable>(threadPoolQueueSize), this);
-
-      return executor;
-    } else {
-      // the old way of using unbounded task queue.
-      // if the running tasks are taking a long time or stuck, this queue
-      // will be very very long.
-      return new TrackingThreadPool(nThreads, nThreads, 0L,
-          TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), this);
-    }
-  }
-
   private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
     Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
         new HashMap<Pair<Integer, Integer>, ProjectVersion>();
@@ -236,6 +202,34 @@ public class FlowRunnerManager implements EventListener,
     this.globalProps = globalProps;
   }
 
+  private class SubmitterThread extends Thread {
+    private BlockingQueue<FlowRunner> queue;
+    private boolean shutdown = false;
+
+    public SubmitterThread(BlockingQueue<FlowRunner> queue) {
+      this.setName("FlowRunnerManager-Submitter-Thread");
+      this.queue = queue;
+    }
+
+    @SuppressWarnings("unused")
+    public void shutdown() {
+      shutdown = true;
+      this.interrupt();
+    }
+
+    public void run() {
+      while (!shutdown) {
+        try {
+          lastSubmitterThreadCheckTime = System.currentTimeMillis();
+          FlowRunner flowRunner = queue.take();
+          executorService.submit(flowRunner);
+        } catch (InterruptedException e) {
+          logger.info("Interrupted. Probably to shut down.");
+        }
+      }
+    }
+  }
+
   private class CleanerThread extends Thread {
     // Every hour, clean execution dir.
     private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60 * 60 * 1000;
@@ -436,18 +430,18 @@ public class FlowRunnerManager implements EventListener,
     }
 
     int numJobThreads = numJobThreadPerFlow;
-    if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
+    if (options.getFlowParameters().containsKey("flow.num.job.threads")) {
       try {
         int numJobs =
             Integer.valueOf(options.getFlowParameters().get(
-                FLOW_NUM_JOB_THREADS));
+                "flow.num.job.threads"));
         if (numJobs > 0 && numJobs <= numJobThreads) {
           numJobThreads = numJobs;
         }
       } catch (Exception e) {
         throw new ExecutorManagerException(
             "Failed to set the number of job threads "
-                + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
+                + options.getFlowParameters().get("flow.num.job.threads")
                 + " for flow " + execId, e);
       }
     }
@@ -467,18 +461,7 @@ public class FlowRunnerManager implements EventListener,
 
     // Finally, queue the sucker.
     runningFlows.put(execId, runner);
-
-    try {
-      // The executorService already has a queue
-      Future<?> future = executorService.submit(runner);
-      // keep track of this future
-      submittedFlows.put(future, runner.getExecutionId());
-    } catch (RejectedExecutionException re) {
-      throw new ExecutorManagerException(
-          "Azkaban server can't execute any more flows. "
-              + "The number of running flows has reached the system configured limit."
-              + "Please notify Azkaban administrators");
-    }
+    flowQueue.add(runner);
   }
 
   private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -588,7 +571,6 @@ public class FlowRunnerManager implements EventListener,
       logger.info("Flow " + flow.getExecutionId()
           + " is finished. Adding it to recently finished flows list.");
       runningFlows.remove(flow.getExecutionId());
-      submittedFlows.remove(flow.getExecutionId());
     }
   }
 
@@ -731,10 +713,22 @@ public class FlowRunnerManager implements EventListener,
     return lastCleanerThreadCheckTime;
   }
 
+  public long getLastSubmitterThreadCheckTime() {
+    return lastSubmitterThreadCheckTime;
+  }
+
+  public boolean isSubmitterThreadActive() {
+    return this.submitterThread.isAlive();
+  }
+
   public boolean isCleanerThreadActive() {
     return this.cleanerThread.isAlive();
   }
 
+  public State getSubmitterThreadState() {
+    return this.submitterThread.getState();
+  }
+
   public State getCleanerThreadState() {
     return this.cleanerThread.getState();
   }
@@ -743,73 +737,26 @@ public class FlowRunnerManager implements EventListener,
     return executorService.isShutdown();
   }
 
-  public int getNumQueuedFlows() {
-    return executorService.getQueue().size();
-  }
-
-  public int getNumRunningFlows() {
-    return executorService.getActiveCount();
+  public int getNumExecutingFlows() {
+    return runningFlows.size();
   }
 
   public String getRunningFlowIds() {
-    Set<Runnable> inProgressTasks = executorService.getInProgressTasks();
-
-    List<Integer> runningFlowIds =
-        new ArrayList<Integer>(inProgressTasks.size());
-
-    for (Runnable task : inProgressTasks) {
-      Integer execId = submittedFlows.get(task);
-      if (execId != null) {
-        runningFlowIds.add(execId);
-      } else {
-        logger.warn("getRunningFlowIds: got null execId for task: " + task);
-      }
-    }
-
-    Collections.sort(runningFlowIds);
-    return runningFlowIds.toString();
+    ArrayList<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
+    Collections.sort(ids);
+    return ids.toString();
   }
 
-  public String getQueuedFlowIds() {
-    List<Integer> flowIdList =
-        new ArrayList<Integer>(executorService.getQueue().size());
-
-    for (Runnable task : executorService.getQueue()) {
-      Integer execId = submittedFlows.get(task);
-      if (execId != null) {
-        flowIdList.add(execId);
-      } else {
-        logger
-            .warn("getQueuedFlowIds: got null execId for queuedTask: " + task);
-      }
+  public int getNumExecutingJobs() {
+    int jobCount = 0;
+    for (FlowRunner runner : runningFlows.values()) {
+      jobCount += runner.getNumRunningJobs();
     }
-    Collections.sort(flowIdList);
-    return flowIdList.toString();
-  }
 
-  public int getMaxNumRunningFlows() {
-    return numThreads;
-  }
-
-  public int getTheadPoolQueueSize() {
-    return threadPoolQueueSize;
+    return jobCount;
   }
 
   public void reloadJobTypePlugins() throws JobTypeManagerException {
     jobtypeManager.loadPlugins();
   }
-
-  public int getTotalNumExecutedFlows() {
-    return executorService.getTotalTasks();
-  }
-
-  @Override
-  public void beforeExecute(Runnable r) {
-  }
-
-  @Override
-  public void afterExecute(Runnable r) {
-    submittedFlows.remove(r);
-  }
-
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
index 02ab2be..2f4ccb2 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
@@ -31,52 +31,48 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
   }
 
   @Override
-  public boolean isCleanerThreadActive() {
-    return manager.isCleanerThreadActive();
+  public long getLastSubmitterThreadCheckTime() {
+    return manager.getLastSubmitterThreadCheckTime();
   }
 
   @Override
-  public String getCleanerThreadState() {
-    return manager.getCleanerThreadState().toString();
+  public boolean isSubmitterThreadActive() {
+    return manager.isSubmitterThreadActive();
   }
 
   @Override
-  public boolean isExecutorThreadPoolShutdown() {
-    return manager.isExecutorThreadPoolShutdown();
+  public boolean isCleanerThreadActive() {
+    return manager.isCleanerThreadActive();
   }
 
   @Override
-  public int getNumRunningFlows() {
-    return manager.getNumRunningFlows();
+  public String getSubmitterThreadState() {
+    return manager.getSubmitterThreadState().toString();
   }
 
   @Override
-  public int getNumQueuedFlows() {
-    return manager.getNumQueuedFlows();
+  public String getCleanerThreadState() {
+    return manager.getCleanerThreadState().toString();
   }
 
   @Override
-  public String getRunningFlows() {
-    return manager.getRunningFlowIds();
+  public boolean isExecutorThreadPoolShutdown() {
+    return manager.isExecutorThreadPoolShutdown();
   }
 
   @Override
-  public String getQueuedFlows() {
-    return manager.getQueuedFlowIds();
+  public int getNumExecutingFlows() {
+    return manager.getNumExecutingFlows();
   }
 
   @Override
-  public int getMaxNumRunningFlows() {
-    return manager.getMaxNumRunningFlows();
+  public int countTotalNumRunningJobs() {
+    return manager.getNumExecutingJobs();
   }
 
   @Override
-  public int getMaxQueuedFlows() {
-    return manager.getTheadPoolQueueSize();
+  public String getRunningFlows() {
+    return manager.getRunningFlowIds();
   }
 
-  @Override
-  public int getTotalNumExecutedFlows() {
-    return manager.getTotalNumExecutedFlows();
-  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
index 71212a8..1709aee 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
@@ -22,34 +22,30 @@ public interface JmxFlowRunnerManagerMBean {
   @DisplayName("OPERATION: getLastCleanerThreadCheckTime")
   public long getLastCleanerThreadCheckTime();
 
+  @DisplayName("OPERATION: getLastSubmitterThreadCheckTime")
+  public long getLastSubmitterThreadCheckTime();
+
+  @DisplayName("OPERATION: isSubmitterThreadActive")
+  public boolean isSubmitterThreadActive();
+
   @DisplayName("OPERATION: isCleanerThreadActive")
   public boolean isCleanerThreadActive();
 
+  @DisplayName("OPERATION: getSubmitterThreadState")
+  public String getSubmitterThreadState();
+
   @DisplayName("OPERATION: getCleanerThreadState")
   public String getCleanerThreadState();
 
   @DisplayName("OPERATION: isExecutorThreadPoolShutdown")
   public boolean isExecutorThreadPoolShutdown();
 
-  @DisplayName("OPERATION: getNumRunningFlows")
-  public int getNumRunningFlows();
-
-  @DisplayName("OPERATION: getNumQueuedFlows")
-  public int getNumQueuedFlows();
+  @DisplayName("OPERATION: getNumExecutingFlows")
+  public int getNumExecutingFlows();
 
   @DisplayName("OPERATION: getRunningFlows")
   public String getRunningFlows();
 
-  @DisplayName("OPERATION: getQueuedFlows")
-  public String getQueuedFlows();
-
-  @DisplayName("OPERATION: getMaxNumRunningFlows")
-  public int getMaxNumRunningFlows();
-
-  @DisplayName("OPERATION: getMaxQueuedFlows")
-  public int getMaxQueuedFlows();
-
-  @DisplayName("OPERATION: getTotalNumExecutedFlows")
-  public int getTotalNumExecutedFlows();
-
+  @DisplayName("OPERATION: getTotalNumRunningJobs")
+  public int countTotalNumRunningJobs();
 }
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 21319d7..c5ae77c 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -27,14 +27,12 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.log4j.Logger;
-
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
@@ -46,8 +44,8 @@ import azkaban.scheduler.ScheduleManagerException;
 import azkaban.server.HttpRequestUtils;
 import azkaban.server.session.Session;
 import azkaban.user.Permission;
-import azkaban.user.Permission.Type;
 import azkaban.user.User;
+import azkaban.user.Permission.Type;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.plugin.PluginRegistry;
@@ -55,9 +53,6 @@ import azkaban.webapp.plugin.ViewerPlugin;
 
 public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private static final long serialVersionUID = 1L;
-
-  private static final Logger logger = Logger.getLogger(ExecutorServlet.class);
-
   private ProjectManager projectManager;
   private ExecutorManagerAdapter executorManager;
   private ScheduleManager scheduleManager;
@@ -354,7 +349,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 
   /**
    * Gets the logs through plain text stream to reduce memory overhead.
-   * 
+   *
    * @param req
    * @param resp
    * @param user
@@ -394,7 +389,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 
   /**
    * Gets the logs through ajax plain text stream to reduce memory overhead.
-   * 
+   *
    * @param req
    * @param resp
    * @param user
@@ -714,7 +709,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       HttpServletResponse resp, HashMap<String, Object> ret, User user,
       ExecutableFlow exFlow) throws ServletException {
     Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
-    logger.info("Fetching " + exFlow.getExecutionId());
+    System.out.println("Fetching " + exFlow.getExecutionId());
 
     Project project =
         getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
@@ -734,7 +729,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private void ajaxFetchExecutableFlow(HttpServletRequest req,
       HttpServletResponse resp, HashMap<String, Object> ret, User user,
       ExecutableFlow exFlow) throws ServletException {
-    logger.info("Fetching " + exFlow.getExecutionId());
+    System.out.println("Fetching " + exFlow.getExecutionId());
 
     Project project =
         getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);