azkaban-aplcache

Fixing DB failover functionalities (#1410) When Mysql failovers

8/30/2017 8:25:08 PM

Details

diff --git a/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java b/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java
index 51f963e..5cd172e 100644
--- a/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java
+++ b/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java
@@ -18,5 +18,6 @@ package azkaban.db;
 
 public class AzDBUtil {
 
-  static final int MAX_DB_RETRY_COUNT = 5;
+  // A very big Integer
+  static final int MAX_DB_RETRY_COUNT = 999999;
 }
diff --git a/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
index 09e4f35..85ef742 100644
--- a/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
+++ b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
@@ -16,11 +16,12 @@
 package azkaban.db;
 
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.log4j.Logger;
 
-
 public class MySQLDataSource extends AzkabanDataSource {
 
   private static final Logger logger = Logger.getLogger(MySQLDataSource.class);
@@ -43,12 +44,13 @@ public class MySQLDataSource extends AzkabanDataSource {
 
   /**
    * This method overrides {@link BasicDataSource#getConnection()}, in order to have retry logics.
+   * We don't make the call synchronized in order to guarantee normal cases performance.
    */
   @Override
-  public synchronized Connection getConnection() throws SQLException {
+  public Connection getConnection() throws SQLException {
 
     Connection connection = null;
-    int retryAttempt = 0;
+    int retryAttempt = 1;
     while (retryAttempt < AzDBUtil.MAX_DB_RETRY_COUNT) {
       try {
         /**
@@ -58,18 +60,57 @@ public class MySQLDataSource extends AzkabanDataSource {
          * Every Attempt generates a thread-hanging-time, about 75 seconds, which is hard coded, and can not be changed.
          */
         connection = createDataSource().getConnection();
-        if (connection != null) {
+
+        /**
+         * If connection is null or connection is read only, retry to find available connection.
+         * When DB fails over from master to slave, master is set to read-only mode. We must keep
+         * finding correct data source and sql connection.
+         */
+        if (connection == null || isReadOnly(connection)) {
+          throw new SQLException("Failed to find DB connection Or connection is read only. ");
+        } else {
           return connection;
         }
       } catch (final SQLException ex) {
-        logger.error(
-            "Failed to find DB connection. waits 1 minutes and retry. No.Attempt = " + retryAttempt,
-            ex);
-      } finally {
+
+        /**
+         * invalidate connection and reconstruct it later. if remote IP address is not reachable,
+         * it will get hang for a while and throw exception.
+         */
+        try {
+          invalidateConnection(connection);
+        } catch (final Exception e) {
+          logger.error( "can not invalidate connection.", e);
+        }
+        logger.error( "Failed to find write-enabled DB connection. Wait 15 seconds and retry."
+            + " No.Attempt = " + retryAttempt, ex);
+        /**
+         * When database is completed down, DB connection fails to be fetched immediately. So we need
+         * to sleep 15 seconds for retry.
+         */
+        sleep(1000L * 15);
         retryAttempt++;
       }
     }
-    return null;
+    return connection;
+  }
+
+  private boolean isReadOnly(final Connection conn) throws SQLException {
+    final Statement stmt = conn.createStatement();
+    final ResultSet rs = stmt.executeQuery("SELECT @@global.read_only");
+    if (rs.next()) {
+      final int value = rs.getInt(1);
+      return value != 0;
+    }
+    throw new SQLException("can not fetch read only value from DB");
+  }
+
+  private void sleep(final long milliseconds) {
+    try {
+      Thread.sleep(milliseconds);
+    } catch (final InterruptedException e) {
+      logger.error("Sleep interrupted", e);
+    }
   }
 
   @Override