/*
 * Decompiled with CFR 0.152.
 */
package voldemort.server.rebalance.async;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.server.rebalance.async.DonorBasedRebalanceAsyncOperation;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.Pair;
import voldemort.versioning.Versioned;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class DonorBasedRebalancePusherSlave
implements Runnable {
    protected static final Logger logger = Logger.getLogger(DonorBasedRebalancePusherSlave.class);
    private int nodeId;
    private BlockingQueue<Pair<ByteArray, Versioned<byte[]>>> queue;
    private String storeName;
    private AdminClient adminClient;
    private ResumableIterator<Pair<ByteArray, Versioned<byte[]>>> nodeIterator;

    public DonorBasedRebalancePusherSlave(int nodeId, BlockingQueue<Pair<ByteArray, Versioned<byte[]>>> queue, String storeName, AdminClient adminClient) {
        this.nodeId = nodeId;
        this.queue = queue;
        this.storeName = storeName;
        this.adminClient = adminClient;
        this.nodeIterator = new ResumableIterator();
    }

    @Override
    public void run() throws VoldemortException {
        boolean needWait = false;
        logger.info((Object)("DonorBasedRebalancePusherSlave begains to send partitions for store " + this.storeName + " to node " + this.nodeId));
        while (!((ResumableIterator)this.nodeIterator).done) {
            try {
                this.nodeIterator.reset();
                this.adminClient.updateEntries(this.nodeId, this.storeName, this.nodeIterator, null);
                this.nodeIterator.purge();
            }
            catch (VoldemortException e) {
                if (e.getCause() instanceof IOException) {
                    this.nodeIterator.setRecoveryMode();
                    logger.error((Object)("Exception received while pushing entries for store " + this.storeName + " to remote node " + this.nodeId + ". Will retry again after 5 minutes"));
                    logger.error((Object)e.getCause());
                    needWait = true;
                }
                throw e;
            }
            if (!needWait) continue;
            try {
                logger.info((Object)"waiting for 5 minutes for the remote node to recover");
                Thread.sleep(30000L);
                needWait = false;
            }
            catch (InterruptedException interruptedException) {}
        }
        logger.info((Object)("DonorBasedRebalancePusherSlave finished sending partitions for store " + this.storeName + " to node " + this.nodeId));
    }

    public void requestCompletion() {
        try {
            this.queue.put(DonorBasedRebalanceAsyncOperation.END);
        }
        catch (InterruptedException e) {
            logger.info((Object)("Unable to send termination message to pusher slave for node " + this.nodeId + " due to the following reason: " + e.getMessage()));
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class ResumableIterator<T>
    implements ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> {
        private boolean done = false;
        private boolean recoveryModeOn = false;
        private int recoveryPosition = 0;
        private Pair<ByteArray, Versioned<byte[]>> currentElem = null;
        private ArrayList<Pair<ByteArray, Versioned<byte[]>>> tentativeList = Lists.newArrayList();

        ResumableIterator() {
        }

        @Override
        public void close() {
        }

        public void setRecoveryMode() {
            if (this.tentativeList.size() > 0) {
                this.recoveryModeOn = true;
                this.recoveryPosition = 0;
            }
        }

        public void purge() {
            if (!this.recoveryModeOn) {
                this.tentativeList.clear();
            } else {
                logger.error((Object)"purge called while recovery mode is on!!!!!");
            }
        }

        public void reset() {
            this.currentElem = null;
        }

        @Override
        public boolean hasNext() {
            boolean hasNext = false;
            if (!this.done) {
                while (null == this.currentElem) {
                    try {
                        this.currentElem = this.getNextElem();
                    }
                    catch (InterruptedException e) {
                        logger.info((Object)"hasNext is interrupted while waiting for the next elem, existing...");
                        break;
                    }
                }
                if (null != this.currentElem && !this.currentElem.equals(DonorBasedRebalanceAsyncOperation.END) && !this.currentElem.equals(DonorBasedRebalanceAsyncOperation.BREAK)) {
                    hasNext = true;
                }
                if (this.currentElem != null && this.currentElem.equals(DonorBasedRebalanceAsyncOperation.END)) {
                    this.done = true;
                    hasNext = false;
                }
            }
            return hasNext;
        }

        @Override
        public Pair<ByteArray, Versioned<byte[]>> next() {
            if (this.done) {
                throw new NoSuchElementException();
            }
            while (null == this.currentElem) {
                try {
                    this.currentElem = this.getNextElem();
                }
                catch (InterruptedException e) {
                    logger.info((Object)"next is interrupted while waiting for the next elem, existing...");
                    break;
                }
                if (null != this.currentElem && !this.currentElem.equals(DonorBasedRebalanceAsyncOperation.END) && !this.currentElem.equals(DonorBasedRebalanceAsyncOperation.BREAK)) continue;
                throw new NoSuchElementException();
            }
            if (this.currentElem != null && this.currentElem.equals(DonorBasedRebalanceAsyncOperation.END)) {
                this.done = true;
            }
            Pair<ByteArray, Versioned<byte[]>> returnValue = this.currentElem;
            this.currentElem = null;
            return returnValue;
        }

        private Pair<ByteArray, Versioned<byte[]>> getNextElem() throws InterruptedException {
            Pair<ByteArray, Versioned<byte[]>> retValue = null;
            if (this.recoveryModeOn) {
                retValue = this.tentativeList.get(this.recoveryPosition);
                ++this.recoveryPosition;
                if (this.recoveryPosition >= this.tentativeList.size()) {
                    this.recoveryModeOn = false;
                }
                if (retValue == null) {
                    logger.error((Object)("No elements found in the recovery list while in the recovery mode!\n  recovery list size: " + this.tentativeList.size() + "  recovery position: " + this.recoveryPosition));
                }
            } else {
                retValue = (Pair<ByteArray, Versioned<byte[]>>)DonorBasedRebalancePusherSlave.this.queue.take();
                this.tentativeList.add(retValue);
            }
            return retValue;
        }

        @Override
        public void remove() {
            throw new VoldemortException("Remove not supported");
        }
    }
}

