DatabaseSetup.java

199 lines | 6.335 kB Blame History Raw Download
/*
 * Copyright 2017 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.db;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;

/**
 * TODO kunkun-tang: this class is quite messy now, needs to be refactored.
 */
public class DatabaseSetup {

  public static final String DATABASE_SQL_SCRIPT_DIR =
      "database.sql.scripts.dir";
  private static final Logger logger = Logger
      .getLogger(DatabaseSetup.class);
  private static final String DEFAULT_SCRIPT_PATH = "sql";
  private static final String CREATE_SCRIPT_PREFIX = "create.";
  private static final String SQL_SCRIPT_SUFFIX = ".sql";

  private static final String INSERT_DB_PROPERTY =
      "INSERT INTO properties (name, type, value, modified_time) values (?,?,?,?)";
  private static final String UPDATE_DB_PROPERTY =
      "UPDATE properties SET value=?,modified_time=? WHERE name=? AND type=?";

  private final AzkabanDataSource dataSource;
  private final Set<String> missingTables = new HashSet<>();
  private final Map<String, String> installedVersions = new HashMap<>();
  private String version;
  private boolean needsUpdating;

  private String scriptPath = null;

  public DatabaseSetup(final AzkabanDataSource ds) {
    this.dataSource = ds;
    if (this.scriptPath == null) {
      this.scriptPath = DEFAULT_SCRIPT_PATH;
    }
  }

  public DatabaseSetup(final AzkabanDataSource ds, final String path) {
    this.dataSource = ds;
    this.scriptPath = path;
  }

  public void updateDatabase()
      throws SQLException, IOException {
    findMissingTables();
    createNewTables();
  }
  private void findMissingTables() {
    final File directory = new File(this.scriptPath);
    final File[] createScripts =
        directory.listFiles(new PrefixSuffixFileFilter(
            CREATE_SCRIPT_PREFIX, SQL_SCRIPT_SUFFIX));
    if (createScripts != null) {
      for (final File script : createScripts) {
        final String name = script.getName();
        final String[] nameSplit = name.split("\\.");
        final String tableName = nameSplit[1];
        this.missingTables.add(tableName);
      }
    }
  }
  private void createNewTables() throws SQLException, IOException {
    final Connection conn = this.dataSource.getConnection();
    conn.setAutoCommit(false);
    try {
      // Make sure that properties table is created first.
      if (this.missingTables.contains("properties")) {
        runTableScripts(conn, "properties", this.version, this.dataSource.getDBType(),
            false);
      }
      for (final String table : this.missingTables) {
        if (!table.equals("properties")) {
          runTableScripts(conn, table, this.version, this.dataSource.getDBType(), false);
          // update version as we have create a new table
          this.installedVersions.put(table, this.version);
        }
      }
    } finally {
      conn.close();
    }
  }


  private void runTableScripts(final Connection conn, final String table, final String version,
      final String dbType, final boolean update) throws IOException, SQLException {
    String scriptName = "";
    if (update) {
      scriptName = "update." + table + "." + version;
      logger.info("Update table " + table + " to version " + version);
    } else {
      scriptName = "create." + table;
      logger.info("Creating new table " + table + " version " + version);
    }

    final String dbSpecificScript = scriptName + "." + dbType + ".sql";

    File script = new File(this.scriptPath, dbSpecificScript);
    if (!script.exists()) {
      final String dbScript = scriptName + ".sql";
      script = new File(this.scriptPath, dbScript);

      if (!script.exists()) {
        throw new IOException("Creation files do not exist for table " + table);
      }
    }

    BufferedInputStream buff = null;
    try {
      buff = new BufferedInputStream(new FileInputStream(script));
      final String queryStr = IOUtils.toString(buff);

      final String[] splitQuery = queryStr.split(";\\s*\n");

      final QueryRunner runner = new QueryRunner();

      for (final String query : splitQuery) {
        runner.update(conn, query);
      }

      // If it's properties, then we want to commit the table before we update
      // it
      if (table.equals("properties")) {
        conn.commit();
      }

      final String propertyName = table + ".version";
      if (!this.installedVersions.containsKey(table)) {
        runner.update(conn, INSERT_DB_PROPERTY, propertyName,
            1, version,
            System.currentTimeMillis());
      } else {
        runner.update(conn, UPDATE_DB_PROPERTY, version,
            System.currentTimeMillis(), propertyName, 1);
      }
      conn.commit();
    } finally {
      IOUtils.closeQuietly(buff);
    }
  }


  // Reuse code from azkaban.utils.FileIOUtils.PrefixSuffixFileFilter
  // Todo kunkun-tang: needs to be create az core modules to put this class
  public static class PrefixSuffixFileFilter implements FileFilter {

    private final String prefix;
    private final String suffix;

    public PrefixSuffixFileFilter(final String prefix, final String suffix) {
      this.prefix = prefix;
      this.suffix = suffix;
    }

    @Override
    public boolean accept(final File pathname) {
      if (!pathname.isFile() || pathname.isHidden()) {
        return false;
      }

      final String name = pathname.getName();
      final int length = name.length();
      if (this.suffix.length() > length || this.prefix.length() > length) {
        return false;
      }

      return name.startsWith(this.prefix) && name.endsWith(this.suffix);
    }
  }
}