DatabaseExportDao.java

195 lines | 7.536 kB Blame History Raw Download
/*
 * Copyright 2010-2012 Ning, Inc.
 *
 * Ning licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package org.killbill.billing.util.export.dao;

import java.io.IOException;
import java.sql.Blob;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.inject.Inject;
import javax.inject.Singleton;

import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.util.api.ColumnInfo;
import org.killbill.billing.util.api.DatabaseExportOutputStream;
import org.killbill.billing.util.dao.TableName;
import org.killbill.billing.util.validation.DefaultColumnInfo;
import org.killbill.billing.util.validation.dao.DatabaseSchemaDao;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class DatabaseExportDao {

    private static final Logger logger = LoggerFactory.getLogger(DatabaseExportDao.class);

    private final DatabaseSchemaDao databaseSchemaDao;
    private final IDBI dbi;

    @Inject
    public DatabaseExportDao(final DatabaseSchemaDao databaseSchemaDao,
                             final IDBI dbi) {
        this.databaseSchemaDao = databaseSchemaDao;
        this.dbi = dbi;
    }

    private enum TableType {
        /* TableName.ACCOUNT */
        KB_ACCOUNT("record_id", "tenant_record_id"),
        /* TableName.ACCOUNT_HISTORY */
        KB_ACCOUNT_HISTORY("target_record_id", "tenant_record_id"),
        /* Any per-account data table */
        KB_PER_ACCOUNT("account_record_id", "tenant_record_id"),
        /* bus_events, notifications table */
        NOTIFICATION("search_key1", "search_key2"),
        /* To be discarded */
        OTHER(null, null);

        private final String accountRecordIdColumnName;
        private final String tenantRecordIdColumnName;

        TableType(final String accountRecordIdColumnName, final String tenantRecordIdColumnName) {
            this.accountRecordIdColumnName = accountRecordIdColumnName;
            this.tenantRecordIdColumnName = tenantRecordIdColumnName;
        }

        public String getAccountRecordIdColumnName() {
            return accountRecordIdColumnName;
        }

        public String getTenantRecordIdColumnName() {
            return tenantRecordIdColumnName;
        }
    }

    public void exportDataForAccount(final DatabaseExportOutputStream out, final InternalTenantContext context) {
        if (context.getAccountRecordId() == null || context.getTenantRecordId() == null) {
            return;
        }

        final List<DefaultColumnInfo> columns = databaseSchemaDao.getColumnInfoList();
        if (columns.size() == 0) {
            return;
        }

        final List<ColumnInfo> columnsForTable = new ArrayList<ColumnInfo>();
        // The list of columns is ordered by table name first
        String lastSeenTableName = columns.get(0).getTableName();
        for (final ColumnInfo column : columns) {
            if (!column.getTableName().equals(lastSeenTableName)) {
                exportDataForAccountAndTable(out, columnsForTable, context);
                lastSeenTableName = column.getTableName();
                columnsForTable.clear();
            }
            columnsForTable.add(column);
        }
        exportDataForAccountAndTable(out, columnsForTable, context);
    }


    private void exportDataForAccountAndTable(final DatabaseExportOutputStream out, final List<ColumnInfo> columnsForTable, final InternalTenantContext context) {


        TableType tableType = TableType.OTHER;
        final String tableName = columnsForTable.get(0).getTableName();

        if (TableName.ACCOUNT.getTableName().equals(tableName)) {
            tableType = TableType.KB_ACCOUNT;
        } else if (TableName.ACCOUNT_HISTORY.getTableName().equals(tableName)) {
            tableType = TableType.KB_ACCOUNT_HISTORY;
        }

        boolean firstColumn = true;
        final StringBuilder queryBuilder = new StringBuilder("select ");
        for (final ColumnInfo column : columnsForTable) {
            if (!firstColumn) {
                queryBuilder.append(", ");
            } else {
                firstColumn = false;
            }

            queryBuilder.append(column.getColumnName());

            if (tableType == TableType.OTHER) {
                if (column.getColumnName().equals(TableType.KB_PER_ACCOUNT.getAccountRecordIdColumnName())) {
                    tableType = TableType.KB_PER_ACCOUNT;
                } else if (column.getColumnName().equals(TableType.NOTIFICATION.getAccountRecordIdColumnName())) {
                    tableType = TableType.NOTIFICATION;
                }
            }
        }

        // Don't export non-account specific tables
        if (tableType == TableType.OTHER) {
            return;
        }

        // Build the query - make sure to filter by account and tenant!
        queryBuilder.append(" from ")
                    .append(tableName)
                    .append(" where ")
                    .append(tableType.getAccountRecordIdColumnName())
                    .append(" = :accountRecordId and ")
                    .append(tableType.getTenantRecordIdColumnName())
                    .append("  = :tenantRecordId");

        // Notify the stream that we're about to write data for a different table
        out.newTable(tableName, columnsForTable);

        dbi.withHandle(new HandleCallback<Void>() {
            @Override
            public Void withHandle(final Handle handle) throws Exception {
                final ResultIterator<Map<String, Object>> iterator = handle.createQuery(queryBuilder.toString())
                                                                           .bind("accountRecordId", context.getAccountRecordId())
                                                                           .bind("tenantRecordId", context.getTenantRecordId())
                                                                           .iterator();
                try {
                    while (iterator.hasNext()) {
                        final Map<String, Object> row = iterator.next();

                        for (final String k : row.keySet()) {
                            final Object value = row.get(k);
                            // For h2, transform a JdbcBlob into a byte[]
                            // See also LowerToCamelBeanMapper
                            if (value instanceof Blob) {
                                final Blob blob = (Blob) value;
                                row.put(k, blob.getBytes(0, (int) blob.length()));
                            }
                        }

                        try {
                            out.write(row);
                        } catch (final IOException e) {
                            logger.warn("Unable to write row: {}", row, e);
                            throw e;
                        }
                    }
                } finally {
                    iterator.close();
                }
                return null;
            }
        });
    }
}