diff --git a/src/java/azkaban/database/DataSourceUtils.java b/src/java/azkaban/database/DataSourceUtils.java
index cc95828..ff40296 100644
--- a/src/java/azkaban/database/DataSourceUtils.java
+++ b/src/java/azkaban/database/DataSourceUtils.java
@@ -17,15 +17,18 @@
package azkaban.database;
import java.sql.SQLException;
-
import javax.sql.DataSource;
-
+import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
-
+import org.apache.log4j.Logger;
+import java.sql.PreparedStatement;
+import java.sql.Connection;
import azkaban.utils.Props;
public class DataSourceUtils {
+ private static Logger logger = Logger.getLogger(DataSourceUtils.class);
+
/**
* Property types
*/
@@ -115,6 +118,9 @@ public class DataSourceUtils {
*
*/
public static class MySQLBasicDataSource extends AzkabanDataSource {
+
+ private static MonitorThread monitorThread = null;
+
private MySQLBasicDataSource(String host, int port, String dbName, String user, String password, int numConnections) {
super();
@@ -126,6 +132,11 @@ public class DataSourceUtils {
setMaxActive(numConnections);
setValidationQuery("/* ping */ select 1");
setTestOnBorrow(true);
+
+ if(monitorThread == null) {
+ monitorThread = new MonitorThread(this);
+ monitorThread.start();
+ }
}
@Override
@@ -137,6 +148,52 @@ public class DataSourceUtils {
public String getDBType() {
return "mysql";
}
+
+ private class MonitorThread extends Thread {
+ private static final long MONITOR_THREAD_WAIT_INTERVAL_MS = 30*1000;
+ private boolean shutdown = false;
+ MySQLBasicDataSource dataSource;
+
+ public MonitorThread(MySQLBasicDataSource mysqlSource) {
+ this.setName("MySQL-DB-Monitor-Thread");
+ dataSource = mysqlSource;
+ }
+
+ @SuppressWarnings("unused")
+ public void shutdown() {
+ shutdown = true;
+ this.interrupt();
+ }
+
+ public void run() {
+ while (!shutdown) {
+ synchronized (this) {
+ try {
+ pingDB();
+ wait(MONITOR_THREAD_WAIT_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
+ }
+ }
+ }
+ }
+
+ private void pingDB() {
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ PreparedStatement query = connection.prepareStatement("SELECT 1");
+ query.execute();
+ } catch (SQLException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ logger.error("MySQL connection test failed. Please check MySQL connection health!");
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+ }
+
}
/**