/*
 * Decompiled with CFR 0.152.
 */
package voldemort.server.protocol.admin;

import com.google.protobuf.Message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
import voldemort.VoldemortException;
import voldemort.client.protocol.VoldemortFilter;
import voldemort.client.protocol.admin.filter.DefaultVoldemortFilter;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.StreamRequestHandler;
import voldemort.server.protocol.admin.AdminServiceRequestHandler;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StorageEngine;
import voldemort.store.stats.StreamStats;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Versioned;

public class UpdatePartitionEntriesStreamRequestHandler
implements StreamRequestHandler {
    private VAdminProto.UpdatePartitionEntriesRequest request;
    private final VAdminProto.UpdatePartitionEntriesResponse.Builder responseBuilder = VAdminProto.UpdatePartitionEntriesResponse.newBuilder();
    private final ErrorCodeMapper errorCodeMapper;
    private final EventThrottler throttler;
    private final VoldemortFilter filter;
    private final StorageEngine<ByteArray, byte[], byte[]> storageEngine;
    private int counter;
    private final long startTime;
    private final StreamStats stats;
    private final StreamStats.Handle handle;
    private final Logger logger = Logger.getLogger(this.getClass());

    public UpdatePartitionEntriesStreamRequestHandler(VAdminProto.UpdatePartitionEntriesRequest request, ErrorCodeMapper errorCodeMapper, VoldemortConfig voldemortConfig, StoreRepository storeRepository, NetworkClassLoader networkClassLoader, StreamStats stats) {
        this.request = request;
        this.errorCodeMapper = errorCodeMapper;
        this.storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository, request.getStore());
        this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
        this.filter = request.hasFilter() ? AdminServiceRequestHandler.getFilterFromRequest(request.getFilter(), voldemortConfig, networkClassLoader) : new DefaultVoldemortFilter();
        this.startTime = System.currentTimeMillis();
        this.stats = stats;
        this.handle = stats.makeHandle(StreamStats.Operation.UPDATE, new HashMap<Integer, List<Integer>>());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamRequestHandler.StreamRequestHandlerState handleRequest(DataInputStream inputStream, DataOutputStream outputStream) throws IOException {
        Versioned<byte[]> value;
        VAdminProto.PartitionEntry partitionEntry;
        ByteArray key;
        long startNs = System.nanoTime();
        if (this.request == null) {
            int size = 0;
            try {
                size = inputStream.readInt();
            }
            catch (EOFException e) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)"Incomplete read for message size");
                }
                this.stats.recordNetworkTime(this.handle, System.nanoTime() - startNs);
                return StreamRequestHandler.StreamRequestHandlerState.INCOMPLETE_READ;
            }
            if (size == -1) {
                long totalTime = (System.currentTimeMillis() - this.startTime) / 1000L;
                this.logger.info((Object)("Update entries successfully updated " + this.counter + " entries for store '" + this.storageEngine.getName() + "' in " + totalTime + " s"));
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)"Message size -1, completed partition update");
                }
                this.stats.recordNetworkTime(this.handle, System.nanoTime() - startNs);
                this.stats.closeHandle(this.handle);
                return StreamRequestHandler.StreamRequestHandlerState.COMPLETE;
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("UpdatePartitionEntriesRequest message size: " + size));
            }
            byte[] input = new byte[size];
            try {
                try {
                    ByteUtils.read(inputStream, input);
                }
                catch (EOFException e) {
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace((Object)"Incomplete read for message");
                    }
                    StreamRequestHandler.StreamRequestHandlerState streamRequestHandlerState = StreamRequestHandler.StreamRequestHandlerState.INCOMPLETE_READ;
                    Object var10_10 = null;
                    this.stats.recordNetworkTime(this.handle, System.nanoTime() - startNs);
                    return streamRequestHandlerState;
                }
                Object var10_9 = null;
                this.stats.recordNetworkTime(this.handle, System.nanoTime() - startNs);
            }
            catch (Throwable throwable) {
                Object var10_11 = null;
                this.stats.recordNetworkTime(this.handle, System.nanoTime() - startNs);
                throw throwable;
            }
            VAdminProto.UpdatePartitionEntriesRequest.Builder builder = VAdminProto.UpdatePartitionEntriesRequest.newBuilder();
            builder.mergeFrom(input);
            this.request = builder.build();
        }
        if (this.filter.accept(key = ProtoUtils.decodeBytes((partitionEntry = this.request.getPartitionEntry()).getKey()), value = ProtoUtils.decodeVersioned(partitionEntry.getVersioned()))) {
            startNs = System.nanoTime();
            try {
                block20: {
                    try {
                        this.storageEngine.put(key, value, null);
                        if (!this.logger.isTraceEnabled()) break block20;
                        this.logger.trace((Object)"updateEntries (Streaming put) successful");
                    }
                    catch (ObsoleteVersionException e) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)"updateEntries (Streaming put) threw ObsoleteVersionException, Ignoring.");
                        }
                        Object var12_19 = null;
                        this.stats.recordDiskTime(this.handle, System.nanoTime() - startNs);
                    }
                }
                Object var12_18 = null;
                this.stats.recordDiskTime(this.handle, System.nanoTime() - startNs);
            }
            catch (Throwable throwable) {
                Object var12_20 = null;
                this.stats.recordDiskTime(this.handle, System.nanoTime() - startNs);
                throw throwable;
            }
            this.throttler.maybeThrottle(key.length() + AdminServiceRequestHandler.valueSize(value));
        }
        ++this.counter;
        this.handle.incrementEntriesScanned();
        if (0 == this.counter % 100000) {
            long totalTime = (System.currentTimeMillis() - this.startTime) / 1000L;
            this.logger.info((Object)("Update entries updated " + this.counter + " entries for store '" + this.storageEngine.getName() + "' in " + totalTime + " s"));
        }
        this.request = null;
        return StreamRequestHandler.StreamRequestHandlerState.READING;
    }

    public StreamRequestHandler.StreamRequestDirection getDirection() {
        return StreamRequestHandler.StreamRequestDirection.READING;
    }

    public void close(DataOutputStream outputStream) throws IOException {
        ProtoUtils.writeMessage(outputStream, (Message)this.responseBuilder.build());
    }

    public void handleError(DataOutputStream outputStream, VoldemortException e) throws IOException {
        this.responseBuilder.setError(ProtoUtils.encodeError(this.errorCodeMapper, e));
        if (this.logger.isEnabledFor((Priority)Level.ERROR)) {
            this.logger.error((Object)("handleUpdatePartitionEntries failed for request(" + (Object)((Object)this.request) + ")"), (Throwable)e);
        }
    }
}

