azkaban-aplcache

fix flaky test by creating new DatabaseSetup class (#1365) Run

8/18/2017 8:11:46 PM

Details

diff --git a/azkaban-common/src/test/java/azkaban/test/Utils.java b/azkaban-common/src/test/java/azkaban/test/Utils.java
index 9949b63..2dac04e 100644
--- a/azkaban-common/src/test/java/azkaban/test/Utils.java
+++ b/azkaban-common/src/test/java/azkaban/test/Utils.java
@@ -2,11 +2,11 @@ package azkaban.test;
 
 import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 
-import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.db.AzDBTestUtility.EmbeddedH2BasicDataSource;
 import azkaban.db.AzkabanDataSource;
 import azkaban.db.DatabaseOperator;
 import azkaban.db.DatabaseOperatorImpl;
+import azkaban.db.DatabaseSetup;
 import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -37,13 +37,8 @@ public class Utils {
     final Props props = new Props();
     props.put("database.sql.scripts.dir", sqlScriptsDir);
 
-    // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azkaban-db
-    final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
-        new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
-    final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
-    setup.loadTableInfo();
-    setup.updateDatabase(true, false);
-
+    final DatabaseSetup setup = new DatabaseSetup(dataSource, sqlScriptsDir);
+    setup.updateDatabase();
     return new DatabaseOperatorImpl(new QueryRunner(dataSource));
   }
 }
diff --git a/azkaban-db/build.gradle b/azkaban-db/build.gradle
index a809145..9031a77 100644
--- a/azkaban-db/build.gradle
+++ b/azkaban-db/build.gradle
@@ -21,6 +21,7 @@ dependencies {
     // todo kunkun-tang: consolidate dependencies in azkaban-common and azkaban-db
     compile deps.dbutils
     compile deps.dbcp2
+    compile deps.io
 
     testRuntime deps.h2
 }
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseSetup.java b/azkaban-db/src/main/java/azkaban/db/DatabaseSetup.java
new file mode 100644
index 0000000..7d3f9ff
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseSetup.java
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.db;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * TODO kunkun-tang: this class is quite messy now, needs to be refactored.
+ */
+public class DatabaseSetup {
+
+  public static final String DATABASE_SQL_SCRIPT_DIR =
+      "database.sql.scripts.dir";
+  private static final Logger logger = Logger
+      .getLogger(DatabaseSetup.class);
+  private static final String DEFAULT_SCRIPT_PATH = "sql";
+  private static final String CREATE_SCRIPT_PREFIX = "create.";
+  private static final String SQL_SCRIPT_SUFFIX = ".sql";
+
+  private static final String INSERT_DB_PROPERTY =
+      "INSERT INTO properties (name, type, value, modified_time) values (?,?,?,?)";
+  private static final String UPDATE_DB_PROPERTY =
+      "UPDATE properties SET value=?,modified_time=? WHERE name=? AND type=?";
+
+  private final AzkabanDataSource dataSource;
+  private final Set<String> missingTables = new HashSet<>();
+  private final Map<String, String> installedVersions = new HashMap<>();
+  private String version;
+  private boolean needsUpdating;
+
+  private String scriptPath = null;
+
+  public DatabaseSetup(final AzkabanDataSource ds) {
+    this.dataSource = ds;
+    if (this.scriptPath == null) {
+      this.scriptPath = DEFAULT_SCRIPT_PATH;
+    }
+  }
+
+  public DatabaseSetup(final AzkabanDataSource ds, final String path) {
+    this.dataSource = ds;
+    this.scriptPath = path;
+  }
+
+  public void updateDatabase()
+      throws SQLException, IOException {
+    findMissingTables();
+    createNewTables();
+  }
+  private void findMissingTables() {
+    final File directory = new File(this.scriptPath);
+    final File[] createScripts =
+        directory.listFiles(new PrefixSuffixFileFilter(
+            CREATE_SCRIPT_PREFIX, SQL_SCRIPT_SUFFIX));
+    if (createScripts != null) {
+      for (final File script : createScripts) {
+        final String name = script.getName();
+        final String[] nameSplit = name.split("\\.");
+        final String tableName = nameSplit[1];
+        this.missingTables.add(tableName);
+      }
+    }
+  }
+  private void createNewTables() throws SQLException, IOException {
+    final Connection conn = this.dataSource.getConnection();
+    conn.setAutoCommit(false);
+    try {
+      // Make sure that properties table is created first.
+      if (this.missingTables.contains("properties")) {
+        runTableScripts(conn, "properties", this.version, this.dataSource.getDBType(),
+            false);
+      }
+      for (final String table : this.missingTables) {
+        if (!table.equals("properties")) {
+          runTableScripts(conn, table, this.version, this.dataSource.getDBType(), false);
+          // update version as we have create a new table
+          this.installedVersions.put(table, this.version);
+        }
+      }
+    } finally {
+      conn.close();
+    }
+  }
+
+
+  private void runTableScripts(final Connection conn, final String table, final String version,
+      final String dbType, final boolean update) throws IOException, SQLException {
+    String scriptName = "";
+    if (update) {
+      scriptName = "update." + table + "." + version;
+      logger.info("Update table " + table + " to version " + version);
+    } else {
+      scriptName = "create." + table;
+      logger.info("Creating new table " + table + " version " + version);
+    }
+
+    final String dbSpecificScript = scriptName + "." + dbType + ".sql";
+
+    File script = new File(this.scriptPath, dbSpecificScript);
+    if (!script.exists()) {
+      final String dbScript = scriptName + ".sql";
+      script = new File(this.scriptPath, dbScript);
+
+      if (!script.exists()) {
+        throw new IOException("Creation files do not exist for table " + table);
+      }
+    }
+
+    BufferedInputStream buff = null;
+    try {
+      buff = new BufferedInputStream(new FileInputStream(script));
+      final String queryStr = IOUtils.toString(buff);
+
+      final String[] splitQuery = queryStr.split(";\\s*\n");
+
+      final QueryRunner runner = new QueryRunner();
+
+      for (final String query : splitQuery) {
+        runner.update(conn, query);
+      }
+
+      // If it's properties, then we want to commit the table before we update
+      // it
+      if (table.equals("properties")) {
+        conn.commit();
+      }
+
+      final String propertyName = table + ".version";
+      if (!this.installedVersions.containsKey(table)) {
+        runner.update(conn, INSERT_DB_PROPERTY, propertyName,
+            1, version,
+            System.currentTimeMillis());
+      } else {
+        runner.update(conn, UPDATE_DB_PROPERTY, version,
+            System.currentTimeMillis(), propertyName, 1);
+      }
+      conn.commit();
+    } finally {
+      IOUtils.closeQuietly(buff);
+    }
+  }
+
+
+  // Reuse code from azkaban.utils.FileIOUtils.PrefixSuffixFileFilter
+  // Todo kunkun-tang: needs to be create az core modules to put this class
+  public static class PrefixSuffixFileFilter implements FileFilter {
+
+    private final String prefix;
+    private final String suffix;
+
+    public PrefixSuffixFileFilter(final String prefix, final String suffix) {
+      this.prefix = prefix;
+      this.suffix = suffix;
+    }
+
+    @Override
+    public boolean accept(final File pathname) {
+      if (!pathname.isFile() || pathname.isHidden()) {
+        return false;
+      }
+
+      final String name = pathname.getName();
+      final int length = name.length();
+      if (this.suffix.length() > length || this.prefix.length() > length) {
+        return false;
+      }
+
+      return name.startsWith(this.prefix) && name.endsWith(this.suffix);
+    }
+  }
+}