/*
 * Decompiled with CFR 0.152.
 */
package voldemort.store.socket.clientrequest;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.clientrequest.BlockingClientRequest;
import voldemort.store.socket.clientrequest.ClientRequestExecutor;
import voldemort.store.socket.clientrequest.ProtocolNegotiatorClientRequest;
import voldemort.store.stats.ClientSocketStats;
import voldemort.utils.DaemonThreadFactory;
import voldemort.utils.SelectorManager;
import voldemort.utils.pool.ResourceFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ClientRequestExecutorFactory
implements ResourceFactory<SocketDestination, ClientRequestExecutor> {
    private static final int SHUTDOWN_TIMEOUT_MS = 15000;
    private final int connectTimeoutMs;
    private final int soTimeoutMs;
    private final int socketBufferSize;
    private final AtomicInteger created;
    private final AtomicInteger destroyed;
    private final boolean socketKeepAlive;
    private final ClientRequestSelectorManager[] selectorManagers;
    private final ExecutorService selectorManagerThreadPool;
    private final AtomicInteger counter = new AtomicInteger();
    private final Map<SocketDestination, Long> lastClosedTimestamps;
    private final Logger logger = Logger.getLogger(this.getClass());
    private final ClientSocketStats stats;

    public ClientRequestExecutorFactory(int selectors, int connectTimeoutMs, int soTimeoutMs, int socketBufferSize, boolean socketKeepAlive, ClientSocketStats stats) {
        this.connectTimeoutMs = connectTimeoutMs;
        this.soTimeoutMs = soTimeoutMs;
        this.created = new AtomicInteger(0);
        this.destroyed = new AtomicInteger(0);
        this.socketBufferSize = socketBufferSize;
        this.socketKeepAlive = socketKeepAlive;
        this.stats = stats;
        this.selectorManagers = new ClientRequestSelectorManager[selectors];
        this.selectorManagerThreadPool = Executors.newFixedThreadPool(this.selectorManagers.length, new DaemonThreadFactory("voldemort-niosocket-client-"));
        for (int i = 0; i < this.selectorManagers.length; ++i) {
            this.selectorManagers[i] = new ClientRequestSelectorManager();
            this.selectorManagerThreadPool.execute(this.selectorManagers[i]);
        }
        this.lastClosedTimestamps = new ConcurrentHashMap<SocketDestination, Long>();
    }

    @Override
    public void destroy(SocketDestination dest, ClientRequestExecutor clientRequestExecutor) throws Exception {
        clientRequestExecutor.close();
        int numDestroyed = this.destroyed.incrementAndGet();
        if (this.stats != null) {
            this.stats.connectionDestroy(dest);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Destroyed socket " + numDestroyed + " connection to " + dest.getHost() + ":" + dest.getPort()));
        }
    }

    @Override
    public ClientRequestExecutor create(SocketDestination dest) throws Exception {
        int numCreated = this.created.incrementAndGet();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Creating socket " + numCreated + " for " + dest.getHost() + ":" + dest.getPort() + " using protocol " + dest.getRequestFormatType().getCode()));
        }
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.socket().setReceiveBufferSize(this.socketBufferSize);
        socketChannel.socket().setSendBufferSize(this.socketBufferSize);
        socketChannel.socket().setTcpNoDelay(true);
        socketChannel.socket().setSoTimeout(this.soTimeoutMs);
        socketChannel.socket().setKeepAlive(this.socketKeepAlive);
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort()));
        long startTime = System.currentTimeMillis();
        long duration = 0L;
        long currWaitTime = 1L;
        long prevWaitTime = 1L;
        while (!socketChannel.finishConnect()) {
            duration = System.currentTimeMillis() - startTime;
            long remaining = (long)this.connectTimeoutMs - duration;
            if (remaining < 0L) {
                block16: {
                    try {
                        socketChannel.close();
                    }
                    catch (Exception e) {
                        if (!this.logger.isEnabledFor((Priority)Level.WARN)) break block16;
                        this.logger.warn((Object)e, (Throwable)e);
                    }
                }
                throw new ConnectException("Cannot connect socket " + numCreated + " for " + dest.getHost() + ":" + dest.getPort() + " after " + duration + " ms");
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("Still creating socket " + numCreated + " for " + dest.getHost() + ":" + dest.getPort() + ", " + remaining + " ms. remaining to connect"));
            }
            try {
                Thread.sleep(Math.min(remaining, currWaitTime));
                currWaitTime = Math.min(currWaitTime + prevWaitTime, 50L);
                prevWaitTime = currWaitTime - prevWaitTime;
            }
            catch (InterruptedException e) {
                if (!this.logger.isEnabledFor((Priority)Level.WARN)) continue;
                this.logger.warn((Object)e, (Throwable)e);
            }
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Created socket " + numCreated + " for " + dest.getHost() + ":" + dest.getPort() + " using protocol " + dest.getRequestFormatType().getCode() + " after " + duration + " ms."));
        }
        if (socketChannel.socket().getReceiveBufferSize() != this.socketBufferSize) {
            this.logger.debug((Object)("Requested socket receive buffer size was " + this.socketBufferSize + " bytes but actual size is " + socketChannel.socket().getReceiveBufferSize() + " bytes."));
        }
        if (socketChannel.socket().getSendBufferSize() != this.socketBufferSize) {
            this.logger.debug((Object)("Requested socket send buffer size was " + this.socketBufferSize + " bytes but actual size is " + socketChannel.socket().getSendBufferSize() + " bytes."));
        }
        ClientRequestSelectorManager selectorManager = this.selectorManagers[this.counter.getAndIncrement() % this.selectorManagers.length];
        Selector selector = selectorManager.getSelector();
        ClientRequestExecutor clientRequestExecutor = new ClientRequestExecutor(selector, socketChannel, this.socketBufferSize);
        BlockingClientRequest<String> clientRequest = new BlockingClientRequest<String>(new ProtocolNegotiatorClientRequest(dest.getRequestFormatType()), this.getTimeout());
        clientRequestExecutor.addClientRequest(clientRequest);
        selectorManager.registrationQueue.add(clientRequestExecutor);
        selector.wakeup();
        clientRequest.await();
        try {
            clientRequest.getResult();
        }
        catch (Exception e) {
            block17: {
                try {
                    socketChannel.close();
                }
                catch (Exception ex) {
                    if (!this.logger.isEnabledFor((Priority)Level.WARN)) break block17;
                    this.logger.warn((Object)ex, (Throwable)ex);
                }
            }
            throw e;
        }
        if (this.stats != null) {
            this.stats.connectionCreate(dest);
        }
        return clientRequestExecutor;
    }

    @Override
    public boolean validate(SocketDestination dest, ClientRequestExecutor clientRequestExecutor) {
        long lastClosedTimestamp = this.getLastClosedTimestamp(dest);
        if (clientRequestExecutor.getCreateTimestamp() <= lastClosedTimestamp) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Socket connection " + clientRequestExecutor + " was created on " + new Date(clientRequestExecutor.getCreateTimestamp() / 1000000L) + " before socket pool was closed and re-created (on " + new Date(lastClosedTimestamp / 1000000L) + ")"));
            }
            return false;
        }
        boolean isValid = clientRequestExecutor.isValid();
        if (!isValid && this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Client request executor connection " + clientRequestExecutor + " is no longer valid, closing."));
        }
        return isValid;
    }

    public int getTimeout() {
        return this.soTimeoutMs;
    }

    public int getNumberCreated() {
        return this.created.get();
    }

    public int getNumberDestroyed() {
        return this.destroyed.get();
    }

    @Override
    public void close() {
        block7: {
            try {
                boolean terminated;
                for (int i = 0; i < this.selectorManagers.length; ++i) {
                    try {
                        this.selectorManagers[i].close();
                        continue;
                    }
                    catch (Exception e) {
                        if (!this.logger.isEnabledFor((Priority)Level.WARN)) continue;
                        this.logger.warn((Object)e.getMessage(), (Throwable)e);
                    }
                }
                this.selectorManagerThreadPool.shutdown();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)"Shut down SelectorManager thread pool acceptor, waiting 15000 ms for termination");
                }
                if (!(terminated = this.selectorManagerThreadPool.awaitTermination(15000L, TimeUnit.MILLISECONDS)) && this.logger.isEnabledFor((Priority)Level.WARN)) {
                    this.logger.warn((Object)"SelectorManager thread pool did not stop cleanly after 15000 ms");
                }
            }
            catch (Exception e) {
                if (!this.logger.isEnabledFor((Priority)Level.WARN)) break block7;
                this.logger.warn((Object)e.getMessage(), (Throwable)e);
            }
        }
    }

    private long getLastClosedTimestamp(SocketDestination socketDestination) {
        Long lastClosedTimestamp = this.lastClosedTimestamps.get(socketDestination);
        return lastClosedTimestamp != null ? lastClosedTimestamp : 0L;
    }

    public void setLastClosedTimestamp(SocketDestination socketDestination) {
        this.lastClosedTimestamps.put(socketDestination, System.nanoTime());
    }

    private class ClientRequestSelectorManager
    extends SelectorManager {
        private final Queue<ClientRequestExecutor> registrationQueue = new ConcurrentLinkedQueue<ClientRequestExecutor>();

        private ClientRequestSelectorManager() {
        }

        public Selector getSelector() {
            return this.selector;
        }

        protected void processEvents() {
            block16: {
                block15: {
                    try {
                        ClientRequestExecutor clientRequestExecutor = null;
                        while ((clientRequestExecutor = this.registrationQueue.poll()) != null) {
                            if (this.isClosed.get()) {
                                if (this.logger.isDebugEnabled()) {
                                    this.logger.debug((Object)"Closed, exiting");
                                }
                                break;
                            }
                            SocketChannel socketChannel = clientRequestExecutor.getSocketChannel();
                            try {
                                if (this.logger.isDebugEnabled()) {
                                    this.logger.debug((Object)("Registering connection from " + socketChannel.socket()));
                                }
                                socketChannel.register(this.selector, 4, clientRequestExecutor);
                            }
                            catch (ClosedSelectorException e) {
                                if (this.logger.isDebugEnabled()) {
                                    this.logger.debug((Object)"Selector is closed, exiting");
                                }
                                this.close();
                                break;
                            }
                            catch (Exception e) {
                                if (!this.logger.isEnabledFor((Priority)Level.ERROR)) continue;
                                this.logger.error((Object)e.getMessage(), (Throwable)e);
                            }
                        }
                    }
                    catch (Exception e) {
                        if (!this.logger.isEnabledFor((Priority)Level.ERROR)) break block15;
                        this.logger.error((Object)e.getMessage(), (Throwable)e);
                    }
                }
                try {
                    for (SelectionKey selectionKey : this.selector.keys()) {
                        ClientRequestExecutor clientRequestExecutor = (ClientRequestExecutor)selectionKey.attachment();
                        if (clientRequestExecutor == null) continue;
                        try {
                            clientRequestExecutor.checkTimeout(selectionKey);
                        }
                        catch (Exception e) {
                            if (!this.logger.isEnabledFor((Priority)Level.ERROR)) continue;
                            this.logger.error((Object)e.getMessage(), (Throwable)e);
                        }
                    }
                }
                catch (Exception e) {
                    if (!this.logger.isEnabledFor((Priority)Level.ERROR)) break block16;
                    this.logger.error((Object)e.getMessage(), (Throwable)e);
                }
            }
        }
    }
}

