/*
 * Decompiled with CFR 0.152.
 */
package voldemort.server.protocol.admin;

import com.google.protobuf.Message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.StreamRequestHandler;
import voldemort.server.protocol.admin.AdminServiceRequestHandler;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.readonly.ReadOnlyStorageEngine;
import voldemort.store.stats.StreamStats;
import voldemort.utils.EventThrottler;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;

public class FetchPartitionFileStreamRequestHandler
implements StreamRequestHandler {
    private final VAdminProto.FetchPartitionFilesRequest request;
    private final EventThrottler throttler;
    private final File storeDir;
    private final Logger logger = Logger.getLogger(this.getClass());
    private final long blockSize;
    private final StreamStats stats;
    private final StreamStats.Handle handle;
    private final Iterator<Pair<Integer, Integer>> partitionIterator;
    private FetchStatus fetchStatus;
    private int currentChunkId;
    private int numChunks;
    private Pair<Integer, Integer> currentPair;
    private File indexFile;
    private File dataFile;
    private ChunkedFileWriter chunkedFileWriter;
    private final Set<Pair<Integer, Integer>> replicaToPartitionTuples;
    private final HashMap<Object, Integer> bucketToNumChunks;
    private final boolean nioEnabled;

    protected FetchPartitionFileStreamRequestHandler(VAdminProto.FetchPartitionFilesRequest request, MetadataStore metadataStore, VoldemortConfig voldemortConfig, StoreRepository storeRepository, StreamStats stats) {
        boolean isReadOnly;
        this.request = request;
        StoreDefinition storeDef = metadataStore.getStoreDef(request.getStore());
        boolean bl = isReadOnly = storeDef.getType().compareTo("read-only") == 0;
        if (!isReadOnly) {
            throw new VoldemortException("Should be fetching partition files only for read-only stores");
        }
        HashMap<Integer, List<Integer>> replicaToPartitionList = ProtoUtils.decodePartitionTuple(request.getReplicaToPartitionList());
        this.replicaToPartitionTuples = RebalanceUtils.flattenPartitionTuples(replicaToPartitionList);
        ReadOnlyStorageEngine storageEngine = AdminServiceRequestHandler.getReadOnlyStorageEngine(metadataStore, storeRepository, request.getStore());
        this.bucketToNumChunks = storageEngine.getChunkedFileSet().getChunkIdToNumChunks();
        this.blockSize = voldemortConfig.getAllProps().getLong("partition.buffer.size.bytes", voldemortConfig.getAdminSocketBufferSize());
        this.storeDir = new File(storageEngine.getCurrentDirPath());
        this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
        this.stats = stats;
        this.handle = stats.makeHandle(StreamStats.Operation.FETCH_FILE, replicaToPartitionList);
        this.partitionIterator = Collections.unmodifiableSet(this.replicaToPartitionTuples).iterator();
        this.fetchStatus = FetchStatus.NEXT_PARTITION;
        this.currentChunkId = 0;
        this.indexFile = null;
        this.dataFile = null;
        this.chunkedFileWriter = null;
        this.numChunks = 0;
        this.nioEnabled = voldemortConfig.getUseNioConnector();
    }

    public StreamRequestHandler.StreamRequestDirection getDirection() {
        return StreamRequestHandler.StreamRequestDirection.WRITING;
    }

    public final void close(DataOutputStream outputStream) throws IOException {
        ProtoUtils.writeEndOfStream(outputStream);
    }

    public final void handleError(DataOutputStream outputStream, VoldemortException e) throws IOException {
        this.logger.error((Object)("handleFetchPartitionFilesEntries failed for request(" + this.request.toString() + ")"), (Throwable)e);
    }

    public StreamRequestHandler.StreamRequestHandlerState handleRequest(DataInputStream inputStream, DataOutputStream outputStream) throws IOException {
        StreamRequestHandler.StreamRequestHandlerState handlerState = StreamRequestHandler.StreamRequestHandlerState.WRITING;
        switch (this.fetchStatus) {
            case NEXT_PARTITION: {
                handlerState = this.handleNextPartition();
                break;
            }
            case SEND_DATA_FILE: {
                this.handleSendDataFile(outputStream);
                break;
            }
            case SEND_INDEX_FILE: {
                this.handleSendIndexFile();
                break;
            }
            default: {
                throw new VoldemortException("Invalid fetch status " + (Object)((Object)this.fetchStatus));
            }
        }
        return handlerState;
    }

    private void handleSendIndexFile() throws IOException {
        if (0L == this.chunkedFileWriter.streamFile()) {
            this.logger.info((Object)("Completed streaming " + this.indexFile.getAbsolutePath()));
            this.chunkedFileWriter.close();
            ++this.currentChunkId;
            this.indexFile = null;
            this.dataFile = null;
            this.handle.incrementEntriesScanned();
            this.fetchStatus = this.currentChunkId >= this.numChunks ? FetchStatus.NEXT_PARTITION : FetchStatus.SEND_DATA_FILE;
        } else {
            this.fetchStatus = FetchStatus.SEND_INDEX_FILE;
        }
    }

    private void handleSendDataFile(DataOutputStream outputStream) throws IOException {
        if (null == this.dataFile && null == this.indexFile) {
            String fileName = Integer.toString(this.currentPair.getSecond()) + "_" + Integer.toString(this.currentPair.getFirst()) + "_" + Integer.toString(this.currentChunkId);
            this.dataFile = new File(this.storeDir, fileName + ".data");
            this.indexFile = new File(this.storeDir, fileName + ".index");
            this.chunkedFileWriter = new ChunkedFileWriter(this.dataFile, outputStream);
            this.logger.info((Object)("Streaming " + this.dataFile.getAbsolutePath()));
            this.chunkedFileWriter.writeHeader();
        }
        if (0L == this.chunkedFileWriter.streamFile()) {
            this.logger.info((Object)("Completed streaming " + this.dataFile.getAbsolutePath()));
            this.chunkedFileWriter.close();
            this.chunkedFileWriter = new ChunkedFileWriter(this.indexFile, outputStream);
            this.logger.info((Object)("Streaming " + this.indexFile.getAbsolutePath()));
            this.chunkedFileWriter.writeHeader();
            this.fetchStatus = FetchStatus.SEND_INDEX_FILE;
        } else {
            this.fetchStatus = FetchStatus.SEND_DATA_FILE;
        }
    }

    private StreamRequestHandler.StreamRequestHandlerState handleNextPartition() {
        StreamRequestHandler.StreamRequestHandlerState handlerState = StreamRequestHandler.StreamRequestHandlerState.WRITING;
        if (this.partitionIterator.hasNext()) {
            this.currentPair = this.partitionIterator.next();
            this.currentChunkId = 0;
            if (!this.bucketToNumChunks.containsKey(Pair.create(this.currentPair.getSecond(), this.currentPair.getFirst()))) {
                throw new VoldemortException("Bucket [ partition = " + this.currentPair.getSecond() + ", replica = " + this.currentPair.getFirst() + " ] does not exist for store " + this.request.getStore());
            }
            this.numChunks = this.bucketToNumChunks.get(Pair.create(this.currentPair.getSecond(), this.currentPair.getFirst()));
            this.indexFile = null;
            this.dataFile = null;
            this.fetchStatus = FetchStatus.SEND_DATA_FILE;
        } else {
            this.logger.info((Object)("Finished streaming files for partitions tuples " + this.replicaToPartitionTuples));
            this.stats.closeHandle(this.handle);
            handlerState = StreamRequestHandler.StreamRequestHandlerState.COMPLETE;
        }
        return handlerState;
    }

    private class ChunkedFileWriter {
        private final File fileToWrite;
        private final DataOutputStream outStream;
        private final FileChannel dataChannel;
        private final WritableByteChannel outChannel;
        private long currentPos;

        public ChunkedFileWriter(File fileToWrite, DataOutputStream stream) throws FileNotFoundException {
            this.fileToWrite = fileToWrite;
            this.outStream = stream;
            this.dataChannel = new FileInputStream(fileToWrite).getChannel();
            this.outChannel = Channels.newChannel(this.outStream);
            this.currentPos = 0L;
        }

        public void close() throws IOException {
            this.dataChannel.close();
            if (FetchPartitionFileStreamRequestHandler.this.nioEnabled) {
                this.outChannel.close();
            }
        }

        public void writeHeader() throws IOException {
            VAdminProto.FileEntry response = VAdminProto.FileEntry.newBuilder().setFileName(this.fileToWrite.getName()).setFileSizeBytes(this.dataChannel.size()).build();
            ProtoUtils.writeMessage(this.outStream, (Message)response);
            FetchPartitionFileStreamRequestHandler.this.throttler.maybeThrottle(response.getSerializedSize());
        }

        public long streamFile() throws IOException {
            long bytesRemaining = this.dataChannel.size() - this.currentPos;
            if (0L < bytesRemaining) {
                long bytesToWrite = Math.min(bytesRemaining, FetchPartitionFileStreamRequestHandler.this.blockSize);
                long bytesWritten = this.dataChannel.transferTo(this.currentPos, bytesToWrite, this.outChannel);
                this.currentPos += bytesWritten;
                FetchPartitionFileStreamRequestHandler.this.logger.debug((Object)(bytesWritten + " bytes written"));
                FetchPartitionFileStreamRequestHandler.this.throttler.maybeThrottle((int)bytesWritten);
            }
            bytesRemaining = this.dataChannel.size() - this.currentPos;
            return bytesRemaining;
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static enum FetchStatus {
        NEXT_PARTITION,
        SEND_DATA_FILE,
        SEND_INDEX_FILE;

    }
}

