/*
 * Decompiled with CFR 0.152.
 */
package voldemort.server.rebalance.async;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Cluster;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.rebalance.Rebalancer;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.server.rebalance.async.DonorBasedRebalancePusherSlave;
import voldemort.server.rebalance.async.RebalanceAsyncOperation;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.versioning.Versioned;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class DonorBasedRebalanceAsyncOperation
extends RebalanceAsyncOperation {
    public static final Pair<ByteArray, Versioned<byte[]>> END = Pair.create(new ByteArray("END".getBytes()), new Versioned<byte[]>("END".getBytes()));
    public static final Pair<ByteArray, Versioned<byte[]>> BREAK = Pair.create(new ByteArray("BREAK".getBytes()), new Versioned<byte[]>("BREAK".getBytes()));
    private static final int FETCHUPDATE_BATCH_SIZE = 1000;
    private static final int SCAN_PROGRESS_COUNT = 100000;
    private final List<RebalancePartitionsInfo> stealInfos;
    private final StoreRepository storeRepository;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final Cluster initialCluster;
    private final Cluster targetCluster;
    private final HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> storeToNodePartitionMapping;
    private Map<String, Pair<ExecutorService, List<DonorBasedRebalancePusherSlave>>> updatePushSlavePool;

    private HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> groupByStores(List<RebalancePartitionsInfo> stealInfos) {
        HashMultimap returnMap = HashMultimap.create();
        for (RebalancePartitionsInfo info : stealInfos) {
            int stealerNodeId = info.getStealerId();
            for (Map.Entry<String, HashMap<Integer, List<Integer>>> entry : info.getStoreToReplicaToAddPartitionList().entrySet()) {
                returnMap.put((Object)entry.getKey(), Pair.create(stealerNodeId, entry.getValue()));
            }
        }
        return returnMap;
    }

    public DonorBasedRebalanceAsyncOperation(Rebalancer rebalancer, StoreRepository storeRepository, VoldemortConfig voldemortConfig, MetadataStore metadataStore, int requestId, List<RebalancePartitionsInfo> stealInfos) {
        super(rebalancer, voldemortConfig, metadataStore, requestId, "Donor based rebalance : " + stealInfos);
        this.storeRepository = storeRepository;
        this.stealInfos = stealInfos;
        this.targetCluster = metadataStore.getCluster();
        this.initialCluster = stealInfos.get(0).getInitialCluster();
        this.storeToNodePartitionMapping = this.groupByStores(stealInfos);
        this.updatePushSlavePool = Collections.synchronizedMap(new HashMap());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void operate() throws Exception {
        this.adminClient = RebalanceUtils.createTempAdminClient(this.voldemortConfig, this.metadataStore.getCluster(), this.voldemortConfig.getMaxParallelStoresRebalancing());
        final CopyOnWriteArrayList<Exception> failures = new CopyOnWriteArrayList<Exception>();
        final ConcurrentLinkedQueue storesRebalancing = new ConcurrentLinkedQueue();
        final ConcurrentLinkedQueue storesCompleted = new ConcurrentLinkedQueue();
        final int totalStoresCount = this.storeToNodePartitionMapping.keySet().size();
        try {
            for (final String storeName : ImmutableList.copyOf((Iterable)this.storeToNodePartitionMapping.keySet())) {
                this.executors.submit(new Runnable(){

                    public void run() {
                        try {
                            Set stealerNodeToMappingTuples = DonorBasedRebalanceAsyncOperation.this.storeToNodePartitionMapping.get((Object)storeName);
                            boolean isReadOnlyStore = DonorBasedRebalanceAsyncOperation.this.metadataStore.getStoreDef(storeName).getType().compareTo("read-only") == 0;
                            storesRebalancing.add(storeName);
                            DonorBasedRebalanceAsyncOperation.this.updateStatus(DonorBasedRebalanceAsyncOperation.this.getHeader(DonorBasedRebalanceAsyncOperation.this.stealInfos) + "Completed working on " + storesCompleted.size() + " out of " + totalStoresCount + " stores. Still rebalancing " + storesRebalancing);
                            DonorBasedRebalanceAsyncOperation.this.rebalanceStore(storeName, DonorBasedRebalanceAsyncOperation.this.adminClient, stealerNodeToMappingTuples, isReadOnlyStore);
                            for (Pair entry : stealerNodeToMappingTuples) {
                                DonorBasedRebalanceAsyncOperation.this.adminClient.deleteStoreRebalanceState(DonorBasedRebalanceAsyncOperation.this.metadataStore.getNodeId(), (Integer)entry.getFirst(), storeName);
                                RebalanceAsyncOperation.logger.info((Object)("Removed rebalance state for store " + storeName + " : " + DonorBasedRebalanceAsyncOperation.this.metadataStore.getNodeId() + " ---> " + entry.getFirst()));
                            }
                            storesRebalancing.remove(storeName);
                            storesCompleted.add(storeName);
                            DonorBasedRebalanceAsyncOperation.this.updateStatus(DonorBasedRebalanceAsyncOperation.this.getHeader(DonorBasedRebalanceAsyncOperation.this.stealInfos) + "Completed working on " + storesCompleted.size() + " out of " + totalStoresCount + " stores. Still rebalancing " + storesRebalancing);
                        }
                        catch (Exception e) {
                            RebalanceAsyncOperation.logger.error((Object)(DonorBasedRebalanceAsyncOperation.this.getHeader(DonorBasedRebalanceAsyncOperation.this.stealInfos) + "Error while rebalancing for store " + storeName + " - " + e.getMessage()), (Throwable)e);
                            failures.add(e);
                        }
                    }
                });
            }
            this.waitForShutdown();
            if (storesCompleted.size() != totalStoresCount) {
                logger.error((Object)(this.getHeader(this.stealInfos) + "Could not complete all stores. Completed stores - " + storesCompleted));
                throw new VoldemortRebalancingException(this.getHeader(this.stealInfos) + "Could not complete all stores. Completed stores - " + storesCompleted, failures);
            }
            logger.info((Object)(this.getHeader(this.stealInfos) + "Rebalance of " + this.stealInfos + " completed successfully for all " + totalStoresCount + " stores"));
            Object var8_7 = null;
            this.adminClient.stop();
            this.adminClient = null;
        }
        catch (Throwable throwable) {
            Object var8_8 = null;
            this.adminClient.stop();
            this.adminClient = null;
            for (RebalancePartitionsInfo stealInfo : this.stealInfos) {
                this.rebalancer.releaseRebalancingPermit(stealInfo.getStealerId());
            }
            throw throwable;
        }
        for (RebalancePartitionsInfo stealInfo : this.stealInfos) {
            this.rebalancer.releaseRebalancingPermit(stealInfo.getStealerId());
        }
    }

    private String getHeader(List<RebalancePartitionsInfo> stealInfos) {
        ArrayList stealerNodeIds = Lists.newArrayList();
        for (RebalancePartitionsInfo info : stealInfos) {
            stealerNodeIds.add(info.getStealerId());
        }
        return " Donor " + stealInfos.get(0).getDonorId() + ", Stealer " + stealerNodeIds + "] ";
    }

    private void rebalanceStore(String storeName, AdminClient adminClient, Set<Pair<Integer, HashMap<Integer, List<Integer>>>> stealerNodeToMappingTuples, boolean isReadOnlyStore) {
        StorageEngine<ByteArray, byte[], byte[]> storageEngine = this.storeRepository.getStorageEngine(storeName);
        StoreDefinition storeDef = this.metadataStore.getStoreDef(storeName);
        ArrayList storePushSlaves = Lists.newArrayList();
        ExecutorService pushSlavesExecutor = Executors.newCachedThreadPool(new ThreadFactory(){

            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(r.getClass().getName());
                return thread;
            }
        });
        this.updatePushSlavePool.put(storeName, new Pair<ExecutorService, ArrayList>(pushSlavesExecutor, storePushSlaves));
        if (isReadOnlyStore) {
            throw new VoldemortException("Donor-based rebalancing for read-only store is currently not supported!");
        }
        HashMap nodeToQueue = Maps.newHashMap();
        HashSet optimizedStealerNodeToMappingTuples = Sets.newHashSet();
        if (this.voldemortConfig.getRebalancingOptimization() && !storageEngine.isPartitionAware()) {
            for (Pair<Integer, HashMap<Integer, List<Integer>>> entry : stealerNodeToMappingTuples) {
                HashMap<Integer, List<Integer>> optimizedReplicaToPartition = RebalanceUtils.getOptimizedReplicaToPartitionList(entry.getFirst(), this.initialCluster, storeDef, entry.getSecond());
                if (optimizedReplicaToPartition.size() <= 0) continue;
                optimizedStealerNodeToMappingTuples.add(Pair.create(entry.getFirst(), optimizedReplicaToPartition));
            }
        } else {
            optimizedStealerNodeToMappingTuples.addAll(stealerNodeToMappingTuples);
        }
        if (optimizedStealerNodeToMappingTuples.size() <= 0) {
            return;
        }
        for (Pair<Integer, HashMap<Integer, List<Integer>>> tuple : stealerNodeToMappingTuples) {
            SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>> queue = new SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>();
            nodeToQueue.put(tuple.getFirst(), queue);
            String jobName = "DonorBasedRebalancePusherSlave for store " + storeName + " on node " + tuple.getFirst();
            DonorBasedRebalancePusherSlave updatePushSlave = new DonorBasedRebalancePusherSlave(tuple.getFirst(), queue, storeName, adminClient);
            storePushSlaves.add(updatePushSlave);
            pushSlavesExecutor.execute(updatePushSlave);
            logger.info((Object)("Started a thread for " + jobName));
        }
        this.fetchEntriesForStealers(storageEngine, optimizedStealerNodeToMappingTuples, storeDef, nodeToQueue, storeName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchEntriesForStealers(StorageEngine<ByteArray, byte[], byte[]> storageEngine, Set<Pair<Integer, HashMap<Integer, List<Integer>>>> optimizedStealerNodeToMappingTuples, StoreDefinition storeDef, HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue, String storeName) {
        int scanned = 0;
        int[] fetched = new int[this.targetCluster.getNumberOfNodes()];
        long startTime = System.currentTimeMillis();
        ClosableIterator<ByteArray> keys = storageEngine.keys();
        try {
            try {
                while (this.running.get() && keys.hasNext()) {
                    ByteArray key = (ByteArray)keys.next();
                    ++scanned;
                    List<Integer> nodeIds = RebalanceUtils.checkKeyBelongsToPartition(key.get(), optimizedStealerNodeToMappingTuples, this.targetCluster, storeDef);
                    if (nodeIds.size() > 0) {
                        List<Versioned<byte[]>> values = storageEngine.get(key, null);
                        this.putAll(nodeIds, key, values, nodeToQueue, fetched);
                    }
                    if (0 != scanned % 100000) continue;
                    this.printProgress(scanned, fetched, startTime, storeName);
                }
                this.terminateAllSlaves(storeName);
            }
            catch (InterruptedException e) {
                logger.info((Object)"InterruptedException received while sending entries to remote nodes, the process is terminating...");
                this.terminateAllSlavesAsync(storeName);
                Object var15_15 = null;
                this.close(keys, storeName, scanned, fetched, startTime);
            }
            Object var15_14 = null;
            this.close(keys, storeName, scanned, fetched, startTime);
        }
        catch (Throwable throwable) {
            Object var15_16 = null;
            this.close(keys, storeName, scanned, fetched, startTime);
            throw throwable;
        }
    }

    private void putAll(List<Integer> dests, ByteArray key, List<Versioned<byte[]>> values, HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue, int[] fetched) throws InterruptedException {
        for (Versioned<byte[]> value : values) {
            Iterator<Integer> i$ = dests.iterator();
            while (i$.hasNext()) {
                int nodeId;
                int n = nodeId = i$.next().intValue();
                fetched[n] = fetched[n] + 1;
                nodeToQueue.get(nodeId).put(Pair.create(key, value));
                if (0 != fetched[nodeId] % 1000) continue;
                nodeToQueue.get(nodeId).put(BREAK);
            }
        }
    }

    private void printProgress(int scanned, int[] fetched, long startTime, String storeName) {
        logger.info((Object)("Successfully scanned " + scanned + " tuples in " + (System.currentTimeMillis() - startTime) / 1000L + " s"));
        for (int i = 0; i < fetched.length; ++i) {
            logger.info((Object)(fetched[i] + " tuples fetched for store '" + storeName + " for node " + i));
        }
    }

    private void close(ClosableIterator<ByteArray> keys, String storeName, int scanned, int[] fetched, long startTime) {
        this.printProgress(scanned, fetched, startTime, storeName);
        if (null != keys) {
            keys.close();
        }
    }

    private void terminateAllSlaves(String storeName) {
        logger.info((Object)"Terminating DonorBasedRebalancePushSlaves...");
        ExecutorService pushSlavesExecutor = this.updatePushSlavePool.get(storeName).getFirst();
        List<DonorBasedRebalancePusherSlave> pushSlaves = this.updatePushSlavePool.get(storeName).getSecond();
        Iterator<DonorBasedRebalancePusherSlave> it = pushSlaves.iterator();
        while (it.hasNext()) {
            it.next().requestCompletion();
        }
        pushSlavesExecutor.shutdown();
        try {
            if (pushSlavesExecutor.awaitTermination(30L, TimeUnit.MINUTES)) {
                logger.info((Object)"All DonorBasedRebalancePushSlaves terminated successfully.");
            } else {
                logger.warn((Object)"Timed out while waiting for pusher slaves to shutdown!!!");
            }
        }
        catch (InterruptedException e) {
            logger.warn((Object)"Interrupted while waiting for pusher slaves to shutdown!!!");
        }
        logger.info((Object)"DonorBasedRebalancingOperation existed.");
    }

    private void terminateAllSlavesAsync(String storeName) {
        logger.info((Object)"Terminating DonorBasedRebalancePushSlaves asynchronously.");
        if (null == storeName) {
            for (Pair<ExecutorService, List<DonorBasedRebalancePusherSlave>> pair : this.updatePushSlavePool.values()) {
                ExecutorService pushSlavesExecutor = pair.getFirst();
                List<DonorBasedRebalancePusherSlave> pushSlaves = pair.getSecond();
                Iterator<DonorBasedRebalancePusherSlave> it = pushSlaves.iterator();
                while (it.hasNext()) {
                    it.next().requestCompletion();
                }
                pushSlavesExecutor.shutdownNow();
            }
        } else {
            ExecutorService pushSlavesExecutor = this.updatePushSlavePool.get(storeName).getFirst();
            List<DonorBasedRebalancePusherSlave> pushSlaves = this.updatePushSlavePool.get(storeName).getSecond();
            Iterator<DonorBasedRebalancePusherSlave> it = pushSlaves.iterator();
            while (it.hasNext()) {
                it.next().requestCompletion();
            }
            pushSlavesExecutor.shutdownNow();
        }
        logger.info((Object)"DonorBasedRebalancingAsyncOperation existed.");
    }

    @Override
    public void stop() {
        this.running.set(false);
        this.updateStatus(this.getHeader(this.stealInfos) + "Stop called on donor-based rebalance operation");
        logger.info((Object)(this.getHeader(this.stealInfos) + "Stop called on donor-based rebalance operation"));
        this.terminateAllSlavesAsync(null);
        this.executors.shutdownNow();
    }
}

