azkaban-uncached

add mysql db monitoring thread

12/13/2013 10:15:26 PM

Details

diff --git a/src/java/azkaban/database/DataSourceUtils.java b/src/java/azkaban/database/DataSourceUtils.java
index cc95828..ff40296 100644
--- a/src/java/azkaban/database/DataSourceUtils.java
+++ b/src/java/azkaban/database/DataSourceUtils.java
@@ -17,15 +17,18 @@
 package azkaban.database;
 
 import java.sql.SQLException;
-
 import javax.sql.DataSource;
-
+import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
-
+import org.apache.log4j.Logger;
+import java.sql.PreparedStatement;
+import java.sql.Connection;
 import azkaban.utils.Props;
 
 public class DataSourceUtils {
 	
+	private static Logger logger = Logger.getLogger(DataSourceUtils.class);
+	
 	/**
 	 * Property types
 	 */
@@ -115,6 +118,9 @@ public class DataSourceUtils {
 	 *
 	 */
 	public static class MySQLBasicDataSource extends AzkabanDataSource {
+		
+		private static MonitorThread monitorThread = null;
+		
 		private MySQLBasicDataSource(String host, int port, String dbName, String user, String password, int numConnections) {
 			super();
 			
@@ -126,6 +132,11 @@ public class DataSourceUtils {
 			setMaxActive(numConnections);
 			setValidationQuery("/* ping */ select 1");
 			setTestOnBorrow(true);
+			
+			if(monitorThread == null) {
+				monitorThread = new MonitorThread(this);
+				monitorThread.start();
+			}
 		}
 
 		@Override
@@ -137,6 +148,52 @@ public class DataSourceUtils {
 		public String getDBType() {
 			return "mysql";
 		}
+		
+		private class MonitorThread extends Thread {
+			private static final long MONITOR_THREAD_WAIT_INTERVAL_MS = 30*1000;
+			private boolean shutdown = false;
+			MySQLBasicDataSource dataSource;
+			
+			public MonitorThread(MySQLBasicDataSource mysqlSource) {
+				this.setName("MySQL-DB-Monitor-Thread");
+				dataSource = mysqlSource;
+			}
+			
+			@SuppressWarnings("unused")
+			public void shutdown() {
+				shutdown = true;
+				this.interrupt();
+			}
+			
+			public void run() {
+				while (!shutdown) {
+					synchronized (this) {
+						try {
+							pingDB();
+							wait(MONITOR_THREAD_WAIT_INTERVAL_MS);
+						} catch (InterruptedException e) {
+							logger.info("Interrupted. Probably to shut down.");
+						}
+					}
+				}
+			}
+
+			private void pingDB() {
+				Connection connection = null;
+				try {
+					connection = dataSource.getConnection();
+					PreparedStatement query = connection.prepareStatement("SELECT 1");
+					query.execute();
+				} catch (SQLException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+					logger.error("MySQL connection test failed. Please check MySQL connection health!");
+				} finally {
+					DbUtils.closeQuietly(connection);
+				}
+			}
+		}
+
 	}
 	
 	/**