azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index 645401e..a36d33d 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.log4j.Logger;
 
+import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
 public class JavaProcessJob extends ProcessJob {
@@ -147,4 +148,13 @@ public class JavaProcessJob extends ProcessJob {
 
     return "";
   }
-}
+
+  protected Pair<Long, Long> getProcMemoryRequirement() {
+    String strInitMemSize = getInitialMemorySize();
+    String strMaxMemSize = getMaxMemorySize();
+    long initMemSize = azkaban.utils.Utils.parseMemString2KB(strInitMemSize);
+    long maxMemSize = azkaban.utils.Utils.parseMemString2KB(strMaxMemSize);
+
+    return new Pair<Long, Long>(initMemSize, maxMemSize);
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index bf856fb..43f26f3 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -26,7 +26,9 @@ import org.apache.log4j.Logger;
 
 import azkaban.jobExecutor.utils.process.AzkabanProcess;
 import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
+import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.SystemMemoryInfo;
 
 /**
  * A job that runs a simple unix command
@@ -50,6 +52,13 @@ public class ProcessJob extends AbstractProcessJob {
       handleError("Bad property definition! " + e.getMessage(), e);
     }
 
+    Pair<Long, Long> memPair = getProcMemoryRequirement();
+    boolean memGranted = SystemMemoryInfo.requestMemory(memPair.getFirst(), memPair.getSecond());
+    if (!memGranted) {
+      throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+              memPair.getFirst(), memPair.getSecond(), getId()));
+    }
+
     List<String> commands = null;
     try {
       commands = getCommandList();
@@ -101,6 +110,10 @@ public class ProcessJob extends AbstractProcessJob {
     generateProperties(propFiles[1]);
   }
 
+  protected Pair<Long, Long> getProcMemoryRequirement() {
+    return new Pair<Long, Long>(0L, 0L);
+  }
+
   protected void handleError(String errorMsg, Exception e) throws Exception {
     error(errorMsg);
     if (e != null) {
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index 1254911..52d376c 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -132,7 +132,7 @@ public class XmlValidatorManager implements ValidatorManager {
   public void loadValidators(Props props, Logger log) {
     validators = new LinkedHashMap<String, ProjectValidator>();
     // Add the default validator
-    DirectoryFlowLoader flowLoader = new DirectoryFlowLoader(log);
+    DirectoryFlowLoader flowLoader = new DirectoryFlowLoader(props, log);
     validators.put(flowLoader.getValidatorName(), flowLoader);
 
     if (!props.containsKey(ValidatorConfigs.XML_FILE_PARAM)) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 0ccd1dc..36ac7ca 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -44,8 +44,15 @@ public class DirectoryFlowLoader implements ProjectValidator {
   private static final DirFilter DIR_FILTER = new DirFilter();
   private static final String PROPERTY_SUFFIX = ".properties";
   private static final String JOB_SUFFIX = ".job";
+  private static final String JOB_MAX_XMS = "job.max.Xms";
+  private static final String MAX_XMS_DEFAULT = "1G";
+  private static final String JOB_MAX_XMX = "job.max.Xmx";
+  private static final String MAX_XMX_DEFAULT = "2G";
+  private static final String XMS = "Xms";
+  private static final String XMX = "Xmx";
 
   private final Logger logger;
+  private Props props;
   private HashSet<String> rootNodes;
   private HashMap<String, Flow> flowMap;
   private HashMap<String, Node> nodeMap;
@@ -60,8 +67,9 @@ public class DirectoryFlowLoader implements ProjectValidator {
   private Set<String> errors;
   private Set<String> duplicateJobs;
 
-  public DirectoryFlowLoader(Logger logger) {
+  public DirectoryFlowLoader(Props props, Logger logger) {
     this.logger = logger;
+    this.props = props;
   }
 
   public Map<String, Flow> getFlowMap() {
@@ -95,6 +103,8 @@ public class DirectoryFlowLoader implements ProjectValidator {
     // Load all the props files and create the Node objects
     loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
 
+    jobPropertiesCheck();
+
     // Create edges and find missing dependencies
     resolveDependencies();
 
@@ -366,6 +376,27 @@ public class DirectoryFlowLoader implements ProjectValidator {
     visited.remove(node.getId());
   }
 
+  private void jobPropertiesCheck() {
+    String maxXms = props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
+    String maxXmx = props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
+    long sizeMaxXms = azkaban.utils.Utils.parseMemString2KB(maxXms);
+    long sizeMaxXmx = azkaban.utils.Utils.parseMemString2KB(maxXmx);
+
+    for (String jobName : jobPropsMap.keySet()) {
+      Props jobProps = jobPropsMap.get(jobName);
+      String xms = jobProps.getString(XMS, null);
+      if (xms != null && azkaban.utils.Utils.parseMemString2KB(xms) > sizeMaxXms) {
+        errors.add(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
+                jobName, maxXms));
+      }
+      String xmx = jobProps.getString(XMX, null);
+      if (xmx != null && azkaban.utils.Utils.parseMemString2KB(xmx) > sizeMaxXmx) {
+        errors.add(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
+                jobName, maxXmx));
+      }
+    }
+  }
+
   private String getNameWithoutExtension(File file) {
     String filename = file.getName();
     int index = filename.lastIndexOf('.');
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
new file mode 100644
index 0000000..63ae7a7
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -0,0 +1,165 @@
+package azkaban.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+/**
+ * @author wkang
+ * This class is used to maintain system memory information. Processes utilizing
+ * large amount of memory should consult this class to see if the system has enough
+ * memory to proceed the operation.
+ * 
+ * Memory information is obtained from /proc/meminfo, so only Unix/Linux like system
+ * will support this class.
+ * 
+ * All the memory size used in this function is in KB
+ */
+public class SystemMemoryInfo {
+  private static final Logger logger = Logger.getLogger(SystemMemoryInfo.class);
+
+  private static String MEMINFO_FILE = "/proc/meminfo"; 
+  private static boolean memCheckEnabled;
+  private static boolean memInfoExists;
+  private static long freeMemAmount = 0;
+  private static long freeMemDecrAmt = 0;
+  private static final long LOW_MEM_THRESHOLD = 3L*1024L*1024L; //3 GB
+
+  private static ScheduledExecutorService scheduledExecutorService;
+
+  public static void init(boolean memChkEnabled, long memDecrAmt) {
+    File f = new File(MEMINFO_FILE);
+    memInfoExists = f.exists() && !f.isDirectory();
+    memCheckEnabled = memChkEnabled && memInfoExists;
+    if (memCheckEnabled) {
+      //initial reading of the mem info
+      readMemoryInfoFile();
+      freeMemDecrAmt = memDecrAmt;
+
+      //schedule a thread to read it
+      logger.info("Scheduled thread to read /proc/meminfo every 30 seconds");
+      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+      scheduledExecutorService.scheduleAtFixedRate(new MemoryInfoReader(), 0, 30, TimeUnit.SECONDS);
+    }
+  }
+
+  public synchronized static boolean requestMemory(long xms, long xmx) {
+    if (!memCheckEnabled) {
+      return true;
+    }
+
+    //too small amount of memory left, reject
+    if (freeMemAmount < LOW_MEM_THRESHOLD) {
+      logger.info(String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb),  memory request declined.",
+              freeMemAmount, LOW_MEM_THRESHOLD));
+      return false;
+    }
+
+    //let's get newest mem info
+    if (freeMemAmount >= LOW_MEM_THRESHOLD && freeMemAmount < 2 * LOW_MEM_THRESHOLD) {
+      logger.info(String.format("Free memory amount (%d kb) is less than 2x low mem threshold (%d kb),  re-read /proc/meminfo",
+              freeMemAmount, LOW_MEM_THRESHOLD));
+      readMemoryInfoFile();
+    }
+
+    //too small amount of memory left, reject
+    if (freeMemAmount < LOW_MEM_THRESHOLD) {
+      logger.info(String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb),  memory request declined.",
+              freeMemAmount, LOW_MEM_THRESHOLD));
+      return false;
+    }
+
+    if (freeMemAmount - xmx < LOW_MEM_THRESHOLD) {
+      logger.info(String.format("Free memory amount minus xmx (%d - %d kb) is less than low mem threshold (%d kb),  memory request declined.",
+              freeMemAmount, xmx, LOW_MEM_THRESHOLD));
+      return false;
+    }
+
+    if (freeMemDecrAmt > 0) {
+      freeMemAmount -= freeMemDecrAmt;
+    } else {
+      freeMemAmount -= xms;
+    }
+    
+    logger.info("Current free memory amount is " + freeMemAmount);
+    return true;
+  }
+
+  private synchronized static void updateFreeMemAmount(long size) {
+    freeMemAmount = size;
+  }
+
+  private static void readMemoryInfoFile() {
+    BufferedReader br = null;
+    try {
+      br = new BufferedReader(new FileReader(MEMINFO_FILE));
+
+      long sizeMemFree = 0;
+      long sizeBuffers = 0;
+      long sizeCached = 0;
+      long sizeSwapCached = 0;
+      int count = 0;
+      String line = br.readLine();
+      while (line != null) {
+        if (line.startsWith("MemFree:") || line.startsWith("Buffers:")
+            || line.startsWith("Cached") || line.startsWith("SwapCached")) {
+          int idx1 = line.indexOf(":");
+          int idx2 = line.lastIndexOf("kB");
+          String strSize = line.substring(idx1+1, idx2-1).trim();
+          
+          if (line.startsWith("MemFree:")) {
+            sizeMemFree = Long.parseLong(strSize);
+          } else if (line.startsWith("Buffers:")) {
+            sizeBuffers = Long.parseLong(strSize);
+          } else if (line.startsWith("Cached")) {
+            sizeCached = Long.parseLong(strSize);
+          } else if (line.startsWith("SwapCached")) {
+            sizeSwapCached = Long.parseLong(strSize);
+          }
+
+          //all lines read
+          if (++count == 4) {
+            break;
+          }
+        }
+        line = br.readLine();
+      }
+      
+      if (count < 4) {
+        logger.error("Error: less than 5 rows read for free memory infor");
+      }
+
+      long sizeTotal = sizeMemFree + sizeBuffers + sizeCached + sizeSwapCached;
+
+      logger.info(String.format("Current system free memory is %d (MemFree %d, Buffers %d, Cached %d, SwapCached %d)",
+              sizeTotal, sizeMemFree, sizeBuffers, sizeCached, sizeSwapCached));
+
+      if (sizeTotal > 0) {
+        updateFreeMemAmount(sizeTotal);
+      }
+    } catch (IOException e) {
+      logger.error("Exception in reading memory info file", e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+      } catch (IOException e) {
+        logger.error("Exception in closing the buffered reader", e);
+      }
+    }
+  }
+
+  static class MemoryInfoReader implements Runnable {
+    @Override
+    public void run() {
+      readMemoryInfoFile();
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index 7c417ca..ed9fc52 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -35,7 +35,6 @@ import java.util.zip.ZipFile;
 import java.util.zip.ZipOutputStream;
 
 import org.apache.commons.io.IOUtils;
-
 import org.joda.time.Days;
 import org.joda.time.DurationFieldType;
 import org.joda.time.Hours;
@@ -409,4 +408,27 @@ public class Utils {
 
     return periodStr;
   }
+
+  public static long parseMemString2KB(String strMemSize) {
+    if (strMemSize == null) {
+      return 0L;
+    }
+
+    long size = 0L;
+    if (strMemSize.endsWith("g") || strMemSize.endsWith("G")
+        || strMemSize.endsWith("m") || strMemSize.endsWith("M")
+        || strMemSize.endsWith("k") || strMemSize.endsWith("K")) {
+      String strSize = strMemSize.substring(0, strMemSize.length() - 1);
+      size = Long.parseLong(strSize);     
+    }
+
+    long sizeInKb = 0L;
+    if (strMemSize.endsWith("g") || strMemSize.endsWith("G")) {
+      sizeInKb = size * 1024L * 1024L;
+    } else if (strMemSize.endsWith("m") || strMemSize.endsWith("M")) {
+      sizeInKb = size * 1024L;
+    }
+
+    return sizeInKb;
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index 8997fee..5948c7b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.log4j.Logger;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,6 +36,7 @@ import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
 
 public class ExecutableFlowTest {
   private Project project;
@@ -44,7 +44,7 @@ public class ExecutableFlowTest {
   @Before
   public void setUp() throws Exception {
     Logger logger = Logger.getLogger(this.getClass());
-    DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+    DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
     loader.loadProjectFlow(new File("unit/executions/embedded"));
     Assert.assertEquals(0, loader.getErrors().size());
 
diff --git a/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java
index f658bb5..b45b853 100644
--- a/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java
@@ -29,7 +29,7 @@ public class DirectoryFlowLoaderTest {
   @Ignore @Test
   public void testDirectoryLoad() {
     Logger logger = Logger.getLogger(this.getClass());
-    DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+    DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
 
     loader.loadProjectFlow(new File("unit/executions/exectest1"));
     logger.info(loader.getFlowMap().size());
@@ -38,7 +38,7 @@ public class DirectoryFlowLoaderTest {
   @Ignore @Test
   public void testLoadEmbeddedFlow() {
     Logger logger = Logger.getLogger(this.getClass());
-    DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+    DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
 
     loader.loadProjectFlow(new File("unit/executions/embedded"));
     Assert.assertEquals(0, loader.getErrors().size());
@@ -47,7 +47,7 @@ public class DirectoryFlowLoaderTest {
   @Ignore @Test
   public void testRecursiveLoadEmbeddedFlow() {
     Logger logger = Logger.getLogger(this.getClass());
-    DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+    DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
 
     loader.loadProjectFlow(new File("unit/executions/embeddedBad"));
     for (String error : loader.getErrors()) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index d68bd16..677ae8c 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -19,6 +19,8 @@ package azkaban.execapp;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
@@ -54,6 +56,7 @@ import azkaban.project.ProjectLoader;
 import azkaban.server.AzkabanServer;
 import azkaban.server.ServerConstants;
 import azkaban.utils.Props;
+import azkaban.utils.SystemMemoryInfo;
 import azkaban.utils.Utils;
 
 
@@ -122,6 +125,9 @@ public class AzkabanExecutorServer {
     configureMBeanServer();
     configureMetricReports();
 
+    SystemMemoryInfo.init(props.getBoolean("executor.memCheck.enabled", true),
+            props.getLong("executor.memCheck.freeMemDecrAmt", 0));
+
     try {
       server.start();
     } catch (Exception e) {
@@ -245,6 +251,12 @@ public class AzkabanExecutorServer {
 
       @Override
       public void run() {
+        try {
+          logTopMemoryConsumers();
+        } catch (Exception e) {
+          logger.info(("Exception when logging top memory consumers"), e);
+        }
+
         logger.info("Shutting down http server...");
         try {
           app.stopServer();
@@ -253,6 +265,26 @@ public class AzkabanExecutorServer {
         }
         logger.info("kk thx bye.");
       }
+
+      public void logTopMemoryConsumers() throws Exception, IOException {
+        if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
+                && new File("/usr/bin/head").exists()) {
+          logger.info("logging top memeory consumer");
+
+          java.lang.ProcessBuilder processBuilder =
+                  new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+          Process p = processBuilder.start();
+          p.waitFor();
+  
+          InputStream is = p.getInputStream();
+          java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+          String line = null;
+          while ((line = reader.readLine()) != null) {
+            logger.info(line);
+          }
+          is.close();
+        }
+      }      
     });
   }
 
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 8d258ee..15cb90a 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -23,7 +23,6 @@ import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
-
 import org.junit.Assert;
 import org.junit.After;
 import org.junit.Before;
@@ -49,6 +48,7 @@ import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.project.MockProjectLoader;
 import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.Props;
 
 /**
  * Flows in this test:
@@ -649,7 +649,7 @@ public class FlowRunnerPipelineTest {
 
   private void prepareProject(File directory) throws ProjectManagerException,
       IOException {
-    DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+    DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
     loader.loadProjectFlow(directory);
     if (!loader.getErrors().isEmpty()) {
       for (String error : loader.getErrors()) {
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 5db16f9..a586f64 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -209,7 +209,7 @@ public class FlowRunnerPropertyResolutionTest {
 
   private void prepareProject(File directory) throws ProjectManagerException,
       IOException {
-    DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+    DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
     loader.loadProjectFlow(directory);
     if (!loader.getErrors().isEmpty()) {
       for (String error : loader.getErrors()) {
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index fad4450..00f61b4 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -1380,7 +1380,7 @@ public class FlowRunnerTest2 {
 
   private void prepareProject(File directory)
       throws ProjectManagerException, IOException {
-    DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+    DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
     loader.loadProjectFlow(directory);
     if (!loader.getErrors().isEmpty()) {
       for (String error: loader.getErrors()) {
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 92e8fd3..9a912fe 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -19,6 +19,8 @@ package azkaban.webapp;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.net.MalformedURLException;
@@ -802,6 +804,12 @@ public class AzkabanWebServer extends AzkabanServer {
     Runtime.getRuntime().addShutdownHook(new Thread() {
 
       public void run() {
+        try {
+          logTopMemoryConsumers();
+        } catch (Exception e) {
+          logger.info(("Exception when logging top memory consumers"), e);
+        }
+
         logger.info("Shutting down http server...");
         try {
           app.close();
@@ -812,6 +820,26 @@ public class AzkabanWebServer extends AzkabanServer {
         }
         logger.info("kk thx bye.");
       }
+
+      public void logTopMemoryConsumers() throws Exception, IOException {
+        if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
+                && new File("/usr/bin/head").exists()) {
+          logger.info("logging top memeory consumer");
+
+          java.lang.ProcessBuilder processBuilder =
+                  new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+          Process p = processBuilder.start();
+          p.waitFor();
+  
+          InputStream is = p.getInputStream();
+          java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+          String line = null;
+          while ((line = reader.readLine()) != null) {
+            logger.info(line);
+          }
+          is.close();
+        }
+      }      
     });
     logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port
         + ".");