Details
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index 645401e..a36d33d 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.log4j.Logger;
+import azkaban.utils.Pair;
import azkaban.utils.Props;
public class JavaProcessJob extends ProcessJob {
@@ -147,4 +148,13 @@ public class JavaProcessJob extends ProcessJob {
return "";
}
-}
+
+ protected Pair<Long, Long> getProcMemoryRequirement() {
+ String strInitMemSize = getInitialMemorySize();
+ String strMaxMemSize = getMaxMemorySize();
+ long initMemSize = azkaban.utils.Utils.parseMemString2KB(strInitMemSize);
+ long maxMemSize = azkaban.utils.Utils.parseMemString2KB(strMaxMemSize);
+
+ return new Pair<Long, Long>(initMemSize, maxMemSize);
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index bf856fb..43f26f3 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -26,7 +26,9 @@ import org.apache.log4j.Logger;
import azkaban.jobExecutor.utils.process.AzkabanProcess;
import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
+import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.SystemMemoryInfo;
/**
* A job that runs a simple unix command
@@ -50,6 +52,13 @@ public class ProcessJob extends AbstractProcessJob {
handleError("Bad property definition! " + e.getMessage(), e);
}
+ Pair<Long, Long> memPair = getProcMemoryRequirement();
+ boolean memGranted = SystemMemoryInfo.requestMemory(memPair.getFirst(), memPair.getSecond());
+ if (!memGranted) {
+ throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+ memPair.getFirst(), memPair.getSecond(), getId()));
+ }
+
List<String> commands = null;
try {
commands = getCommandList();
@@ -101,6 +110,10 @@ public class ProcessJob extends AbstractProcessJob {
generateProperties(propFiles[1]);
}
+ protected Pair<Long, Long> getProcMemoryRequirement() {
+ return new Pair<Long, Long>(0L, 0L);
+ }
+
protected void handleError(String errorMsg, Exception e) throws Exception {
error(errorMsg);
if (e != null) {
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index 1254911..52d376c 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -132,7 +132,7 @@ public class XmlValidatorManager implements ValidatorManager {
public void loadValidators(Props props, Logger log) {
validators = new LinkedHashMap<String, ProjectValidator>();
// Add the default validator
- DirectoryFlowLoader flowLoader = new DirectoryFlowLoader(log);
+ DirectoryFlowLoader flowLoader = new DirectoryFlowLoader(props, log);
validators.put(flowLoader.getValidatorName(), flowLoader);
if (!props.containsKey(ValidatorConfigs.XML_FILE_PARAM)) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 0ccd1dc..36ac7ca 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -44,8 +44,15 @@ public class DirectoryFlowLoader implements ProjectValidator {
private static final DirFilter DIR_FILTER = new DirFilter();
private static final String PROPERTY_SUFFIX = ".properties";
private static final String JOB_SUFFIX = ".job";
+ private static final String JOB_MAX_XMS = "job.max.Xms";
+ private static final String MAX_XMS_DEFAULT = "1G";
+ private static final String JOB_MAX_XMX = "job.max.Xmx";
+ private static final String MAX_XMX_DEFAULT = "2G";
+ private static final String XMS = "Xms";
+ private static final String XMX = "Xmx";
private final Logger logger;
+ private Props props;
private HashSet<String> rootNodes;
private HashMap<String, Flow> flowMap;
private HashMap<String, Node> nodeMap;
@@ -60,8 +67,9 @@ public class DirectoryFlowLoader implements ProjectValidator {
private Set<String> errors;
private Set<String> duplicateJobs;
- public DirectoryFlowLoader(Logger logger) {
+ public DirectoryFlowLoader(Props props, Logger logger) {
this.logger = logger;
+ this.props = props;
}
public Map<String, Flow> getFlowMap() {
@@ -95,6 +103,8 @@ public class DirectoryFlowLoader implements ProjectValidator {
// Load all the props files and create the Node objects
loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
+ jobPropertiesCheck();
+
// Create edges and find missing dependencies
resolveDependencies();
@@ -366,6 +376,27 @@ public class DirectoryFlowLoader implements ProjectValidator {
visited.remove(node.getId());
}
+ private void jobPropertiesCheck() {
+ String maxXms = props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
+ String maxXmx = props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
+ long sizeMaxXms = azkaban.utils.Utils.parseMemString2KB(maxXms);
+ long sizeMaxXmx = azkaban.utils.Utils.parseMemString2KB(maxXmx);
+
+ for (String jobName : jobPropsMap.keySet()) {
+ Props jobProps = jobPropsMap.get(jobName);
+ String xms = jobProps.getString(XMS, null);
+ if (xms != null && azkaban.utils.Utils.parseMemString2KB(xms) > sizeMaxXms) {
+ errors.add(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
+ jobName, maxXms));
+ }
+ String xmx = jobProps.getString(XMX, null);
+ if (xmx != null && azkaban.utils.Utils.parseMemString2KB(xmx) > sizeMaxXmx) {
+ errors.add(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
+ jobName, maxXmx));
+ }
+ }
+ }
+
private String getNameWithoutExtension(File file) {
String filename = file.getName();
int index = filename.lastIndexOf('.');
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
new file mode 100644
index 0000000..63ae7a7
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -0,0 +1,165 @@
+package azkaban.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+/**
+ * @author wkang
+ * This class is used to maintain system memory information. Processes utilizing
+ * large amount of memory should consult this class to see if the system has enough
+ * memory to proceed the operation.
+ *
+ * Memory information is obtained from /proc/meminfo, so only Unix/Linux like system
+ * will support this class.
+ *
+ * All the memory size used in this function is in KB
+ */
+public class SystemMemoryInfo {
+ private static final Logger logger = Logger.getLogger(SystemMemoryInfo.class);
+
+ private static String MEMINFO_FILE = "/proc/meminfo";
+ private static boolean memCheckEnabled;
+ private static boolean memInfoExists;
+ private static long freeMemAmount = 0;
+ private static long freeMemDecrAmt = 0;
+ private static final long LOW_MEM_THRESHOLD = 3L*1024L*1024L; //3 GB
+
+ private static ScheduledExecutorService scheduledExecutorService;
+
+ public static void init(boolean memChkEnabled, long memDecrAmt) {
+ File f = new File(MEMINFO_FILE);
+ memInfoExists = f.exists() && !f.isDirectory();
+ memCheckEnabled = memChkEnabled && memInfoExists;
+ if (memCheckEnabled) {
+ //initial reading of the mem info
+ readMemoryInfoFile();
+ freeMemDecrAmt = memDecrAmt;
+
+ //schedule a thread to read it
+ logger.info("Scheduled thread to read /proc/meminfo every 30 seconds");
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService.scheduleAtFixedRate(new MemoryInfoReader(), 0, 30, TimeUnit.SECONDS);
+ }
+ }
+
+ public synchronized static boolean requestMemory(long xms, long xmx) {
+ if (!memCheckEnabled) {
+ return true;
+ }
+
+ //too small amount of memory left, reject
+ if (freeMemAmount < LOW_MEM_THRESHOLD) {
+ logger.info(String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb), memory request declined.",
+ freeMemAmount, LOW_MEM_THRESHOLD));
+ return false;
+ }
+
+ //let's get newest mem info
+ if (freeMemAmount >= LOW_MEM_THRESHOLD && freeMemAmount < 2 * LOW_MEM_THRESHOLD) {
+ logger.info(String.format("Free memory amount (%d kb) is less than 2x low mem threshold (%d kb), re-read /proc/meminfo",
+ freeMemAmount, LOW_MEM_THRESHOLD));
+ readMemoryInfoFile();
+ }
+
+ //too small amount of memory left, reject
+ if (freeMemAmount < LOW_MEM_THRESHOLD) {
+ logger.info(String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb), memory request declined.",
+ freeMemAmount, LOW_MEM_THRESHOLD));
+ return false;
+ }
+
+ if (freeMemAmount - xmx < LOW_MEM_THRESHOLD) {
+ logger.info(String.format("Free memory amount minus xmx (%d - %d kb) is less than low mem threshold (%d kb), memory request declined.",
+ freeMemAmount, xmx, LOW_MEM_THRESHOLD));
+ return false;
+ }
+
+ if (freeMemDecrAmt > 0) {
+ freeMemAmount -= freeMemDecrAmt;
+ } else {
+ freeMemAmount -= xms;
+ }
+
+ logger.info("Current free memory amount is " + freeMemAmount);
+ return true;
+ }
+
+ private synchronized static void updateFreeMemAmount(long size) {
+ freeMemAmount = size;
+ }
+
+ private static void readMemoryInfoFile() {
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new FileReader(MEMINFO_FILE));
+
+ long sizeMemFree = 0;
+ long sizeBuffers = 0;
+ long sizeCached = 0;
+ long sizeSwapCached = 0;
+ int count = 0;
+ String line = br.readLine();
+ while (line != null) {
+ if (line.startsWith("MemFree:") || line.startsWith("Buffers:")
+ || line.startsWith("Cached") || line.startsWith("SwapCached")) {
+ int idx1 = line.indexOf(":");
+ int idx2 = line.lastIndexOf("kB");
+ String strSize = line.substring(idx1+1, idx2-1).trim();
+
+ if (line.startsWith("MemFree:")) {
+ sizeMemFree = Long.parseLong(strSize);
+ } else if (line.startsWith("Buffers:")) {
+ sizeBuffers = Long.parseLong(strSize);
+ } else if (line.startsWith("Cached")) {
+ sizeCached = Long.parseLong(strSize);
+ } else if (line.startsWith("SwapCached")) {
+ sizeSwapCached = Long.parseLong(strSize);
+ }
+
+ //all lines read
+ if (++count == 4) {
+ break;
+ }
+ }
+ line = br.readLine();
+ }
+
+ if (count < 4) {
+ logger.error("Error: less than 5 rows read for free memory infor");
+ }
+
+ long sizeTotal = sizeMemFree + sizeBuffers + sizeCached + sizeSwapCached;
+
+ logger.info(String.format("Current system free memory is %d (MemFree %d, Buffers %d, Cached %d, SwapCached %d)",
+ sizeTotal, sizeMemFree, sizeBuffers, sizeCached, sizeSwapCached));
+
+ if (sizeTotal > 0) {
+ updateFreeMemAmount(sizeTotal);
+ }
+ } catch (IOException e) {
+ logger.error("Exception in reading memory info file", e);
+ } finally {
+ try {
+ if (br != null) {
+ br.close();
+ }
+ } catch (IOException e) {
+ logger.error("Exception in closing the buffered reader", e);
+ }
+ }
+ }
+
+ static class MemoryInfoReader implements Runnable {
+ @Override
+ public void run() {
+ readMemoryInfoFile();
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index 7c417ca..ed9fc52 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -35,7 +35,6 @@ import java.util.zip.ZipFile;
import java.util.zip.ZipOutputStream;
import org.apache.commons.io.IOUtils;
-
import org.joda.time.Days;
import org.joda.time.DurationFieldType;
import org.joda.time.Hours;
@@ -409,4 +408,27 @@ public class Utils {
return periodStr;
}
+
+ public static long parseMemString2KB(String strMemSize) {
+ if (strMemSize == null) {
+ return 0L;
+ }
+
+ long size = 0L;
+ if (strMemSize.endsWith("g") || strMemSize.endsWith("G")
+ || strMemSize.endsWith("m") || strMemSize.endsWith("M")
+ || strMemSize.endsWith("k") || strMemSize.endsWith("K")) {
+ String strSize = strMemSize.substring(0, strMemSize.length() - 1);
+ size = Long.parseLong(strSize);
+ }
+
+ long sizeInKb = 0L;
+ if (strMemSize.endsWith("g") || strMemSize.endsWith("G")) {
+ sizeInKb = size * 1024L * 1024L;
+ } else if (strMemSize.endsWith("m") || strMemSize.endsWith("M")) {
+ sizeInKb = size * 1024L;
+ }
+
+ return sizeInKb;
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index 8997fee..5948c7b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -37,6 +36,7 @@ import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
public class ExecutableFlowTest {
private Project project;
@@ -44,7 +44,7 @@ public class ExecutableFlowTest {
@Before
public void setUp() throws Exception {
Logger logger = Logger.getLogger(this.getClass());
- DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
loader.loadProjectFlow(new File("unit/executions/embedded"));
Assert.assertEquals(0, loader.getErrors().size());
diff --git a/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java
index f658bb5..b45b853 100644
--- a/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java
@@ -29,7 +29,7 @@ public class DirectoryFlowLoaderTest {
@Ignore @Test
public void testDirectoryLoad() {
Logger logger = Logger.getLogger(this.getClass());
- DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
loader.loadProjectFlow(new File("unit/executions/exectest1"));
logger.info(loader.getFlowMap().size());
@@ -38,7 +38,7 @@ public class DirectoryFlowLoaderTest {
@Ignore @Test
public void testLoadEmbeddedFlow() {
Logger logger = Logger.getLogger(this.getClass());
- DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
loader.loadProjectFlow(new File("unit/executions/embedded"));
Assert.assertEquals(0, loader.getErrors().size());
@@ -47,7 +47,7 @@ public class DirectoryFlowLoaderTest {
@Ignore @Test
public void testRecursiveLoadEmbeddedFlow() {
Logger logger = Logger.getLogger(this.getClass());
- DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
loader.loadProjectFlow(new File("unit/executions/embeddedBad"));
for (String error : loader.getErrors()) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index d68bd16..677ae8c 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -19,6 +19,8 @@ package azkaban.execapp;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
@@ -54,6 +56,7 @@ import azkaban.project.ProjectLoader;
import azkaban.server.AzkabanServer;
import azkaban.server.ServerConstants;
import azkaban.utils.Props;
+import azkaban.utils.SystemMemoryInfo;
import azkaban.utils.Utils;
@@ -122,6 +125,9 @@ public class AzkabanExecutorServer {
configureMBeanServer();
configureMetricReports();
+ SystemMemoryInfo.init(props.getBoolean("executor.memCheck.enabled", true),
+ props.getLong("executor.memCheck.freeMemDecrAmt", 0));
+
try {
server.start();
} catch (Exception e) {
@@ -245,6 +251,12 @@ public class AzkabanExecutorServer {
@Override
public void run() {
+ try {
+ logTopMemoryConsumers();
+ } catch (Exception e) {
+ logger.info(("Exception when logging top memory consumers"), e);
+ }
+
logger.info("Shutting down http server...");
try {
app.stopServer();
@@ -253,6 +265,26 @@ public class AzkabanExecutorServer {
}
logger.info("kk thx bye.");
}
+
+ public void logTopMemoryConsumers() throws Exception, IOException {
+ if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
+ && new File("/usr/bin/head").exists()) {
+ logger.info("logging top memeory consumer");
+
+ java.lang.ProcessBuilder processBuilder =
+ new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+ Process p = processBuilder.start();
+ p.waitFor();
+
+ InputStream is = p.getInputStream();
+ java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ logger.info(line);
+ }
+ is.close();
+ }
+ }
});
}
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 8d258ee..15cb90a 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
-
import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
@@ -49,6 +48,7 @@ import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.project.MockProjectLoader;
import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.Props;
/**
* Flows in this test:
@@ -649,7 +649,7 @@ public class FlowRunnerPipelineTest {
private void prepareProject(File directory) throws ProjectManagerException,
IOException {
- DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
loader.loadProjectFlow(directory);
if (!loader.getErrors().isEmpty()) {
for (String error : loader.getErrors()) {
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 5db16f9..a586f64 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -209,7 +209,7 @@ public class FlowRunnerPropertyResolutionTest {
private void prepareProject(File directory) throws ProjectManagerException,
IOException {
- DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
loader.loadProjectFlow(directory);
if (!loader.getErrors().isEmpty()) {
for (String error : loader.getErrors()) {
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index fad4450..00f61b4 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -1380,7 +1380,7 @@ public class FlowRunnerTest2 {
private void prepareProject(File directory)
throws ProjectManagerException, IOException {
- DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
loader.loadProjectFlow(directory);
if (!loader.getErrors().isEmpty()) {
for (String error: loader.getErrors()) {
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 92e8fd3..9a912fe 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -19,6 +19,8 @@ package azkaban.webapp;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
@@ -802,6 +804,12 @@ public class AzkabanWebServer extends AzkabanServer {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
+ try {
+ logTopMemoryConsumers();
+ } catch (Exception e) {
+ logger.info(("Exception when logging top memory consumers"), e);
+ }
+
logger.info("Shutting down http server...");
try {
app.close();
@@ -812,6 +820,26 @@ public class AzkabanWebServer extends AzkabanServer {
}
logger.info("kk thx bye.");
}
+
+ public void logTopMemoryConsumers() throws Exception, IOException {
+ if (new File("/bin/bash").exists() && new File("/bin/ps").exists()
+ && new File("/usr/bin/head").exists()) {
+ logger.info("logging top memeory consumer");
+
+ java.lang.ProcessBuilder processBuilder =
+ new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head");
+ Process p = processBuilder.start();
+ p.waitFor();
+
+ InputStream is = p.getInputStream();
+ java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ logger.info(line);
+ }
+ is.close();
+ }
+ }
});
logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port
+ ".");