/*
 * Decompiled with CFR 0.152.
 */
package com.gitblit.fanout;

import com.gitblit.fanout.FanoutConstants;
import com.gitblit.fanout.FanoutService;
import com.gitblit.fanout.FanoutServiceConnection;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FanoutNioService
extends FanoutService {
    private static final Logger logger = LoggerFactory.getLogger(FanoutNioService.class);
    private volatile ServerSocketChannel serviceCh;
    private volatile Selector selector;

    public static void main(String[] args) throws Exception {
        FanoutNioService pubsub = new FanoutNioService(null, 17000);
        pubsub.setStrictRequestTermination(false);
        pubsub.setAllowAllChannelAnnouncements(false);
        pubsub.start();
    }

    public FanoutNioService(int port) {
        this(null, port);
    }

    public FanoutNioService(String bindInterface, int port) {
        super(bindInterface, port, "Fanout nio service");
    }

    @Override
    protected boolean isConnected() {
        return this.serviceCh != null;
    }

    @Override
    protected boolean connect() {
        if (this.serviceCh == null) {
            try {
                this.serviceCh = ServerSocketChannel.open();
                this.serviceCh.configureBlocking(false);
                this.serviceCh.socket().setReuseAddress(true);
                this.serviceCh.socket().bind(this.host == null ? new InetSocketAddress(this.port) : new InetSocketAddress(this.host, this.port));
                this.selector = Selector.open();
                this.serviceCh.register(this.selector, 16);
                logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", this.name, this.host == null ? "0.0.0.0" : this.host, this.port));
            }
            catch (IOException e) {
                logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", this.name, this.name, this.host == null ? "0.0.0.0" : this.host, this.port), (Throwable)e);
                return false;
            }
        }
        return true;
    }

    @Override
    protected void disconnect() {
        try {
            if (this.serviceCh != null) {
                Map<String, SocketChannel> clients = this.getCurrentClientSockets();
                for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
                    this.closeClientSocket(client.getKey(), client.getValue());
                }
                logger.debug(MessageFormat.format("closing {0} socket channel", this.name));
                this.serviceCh.socket().close();
                this.serviceCh.close();
                this.serviceCh = null;
                this.selector.close();
                this.selector = null;
            }
        }
        catch (IOException e) {
            logger.error(MessageFormat.format("failed to disconnect {0}", this.name), (Throwable)e);
        }
    }

    @Override
    protected void listen() throws IOException {
        while (this.selector.select(5000L) > 0) {
            Set<SelectionKey> keys = this.selector.selectedKeys();
            Iterator<SelectionKey> keyItr = keys.iterator();
            while (keyItr.hasNext()) {
                block17: {
                    FanoutNioConnection connection;
                    SocketChannel ch;
                    SelectionKey key = keyItr.next();
                    if (key.isAcceptable()) {
                        ServerSocketChannel sch = (ServerSocketChannel)key.channel();
                        try {
                            SocketChannel ch2 = sch.accept();
                            ch2.configureBlocking(false);
                            this.configureClientSocket(ch2.socket());
                            FanoutNioConnection connection2 = new FanoutNioConnection(ch2);
                            this.addConnection(connection2);
                            ch2.register(this.selector, 4, connection2);
                        }
                        catch (IOException e) {
                            logger.error("error accepting fanout connection", (Throwable)e);
                        }
                    } else if (key.isReadable()) {
                        ch = (SocketChannel)key.channel();
                        connection = (FanoutNioConnection)key.attachment();
                        try {
                            connection.read(ch, this.isStrictRequestTermination());
                            int replies = 0;
                            Iterator<String> reqItr = connection.requestQueue.iterator();
                            while (reqItr.hasNext()) {
                                String req = reqItr.next();
                                String reply = this.processRequest(connection, req);
                                reqItr.remove();
                                if (reply == null) continue;
                                ++replies;
                            }
                            if (replies > 0) {
                                ch.register(this.selector, 4, connection);
                                break block17;
                            }
                            ch.register(this.selector, 1, connection);
                        }
                        catch (IOException e) {
                            logger.error(MessageFormat.format("fanout connection {0} error: {1}", connection.id, e.getMessage()));
                            this.removeConnection(connection);
                            this.closeClientSocket(connection.id, ch);
                        }
                    } else if (key.isWritable()) {
                        ch = (SocketChannel)key.channel();
                        connection = (FanoutNioConnection)key.attachment();
                        try {
                            connection.write(ch);
                            if (this.hasConnection(connection)) {
                                ch.register(this.selector, 1, connection);
                            } else {
                                this.closeClientSocket(connection.id, ch);
                            }
                        }
                        catch (IOException e) {
                            logger.error(MessageFormat.format("fanout connection {0}: {1}", connection.id, e.getMessage()));
                            this.removeConnection(connection);
                            this.closeClientSocket(connection.id, ch);
                        }
                    }
                }
                keyItr.remove();
            }
        }
    }

    protected void closeClientSocket(String id, SocketChannel ch) {
        try {
            ch.close();
        }
        catch (IOException e) {
            logger.error(MessageFormat.format("fanout connection {0}", id), (Throwable)e);
        }
    }

    @Override
    protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
        super.broadcast(connections, channel, message);
        Map<String, SocketChannel> sockets = this.getCurrentClientSockets();
        for (FanoutServiceConnection connection : connections) {
            SocketChannel ch = sockets.get(connection.id);
            if (ch == null) {
                logger.warn(MessageFormat.format("fanout connection {0} has been disconnected", connection.id));
                this.removeConnection(connection);
                continue;
            }
            try {
                ch.register(this.selector, 4, connection);
            }
            catch (IOException e) {
                logger.error(MessageFormat.format("failed to register write op for fanout connection {0}", connection.id));
            }
        }
    }

    protected Map<String, SocketChannel> getCurrentClientSockets() {
        HashMap<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
        for (SelectionKey key : this.selector.keys()) {
            if (!(key.channel() instanceof SocketChannel)) continue;
            SocketChannel ch = (SocketChannel)key.channel();
            String id = FanoutConstants.getRemoteSocketId(ch.socket());
            sockets.put(id, ch);
        }
        return sockets;
    }

    static class FanoutNioConnection
    extends FanoutServiceConnection {
        final ByteBuffer readBuffer = ByteBuffer.allocate(512);
        final ByteBuffer writeBuffer = ByteBuffer.allocate(512);
        final List<String> requestQueue = new ArrayList<String>();
        final List<String> replyQueue = new ArrayList<String>();
        final CharsetDecoder decoder = Charset.forName("ISO-8859-1").newDecoder();

        FanoutNioConnection(SocketChannel ch) {
            super(ch.socket());
        }

        protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
            long bytesRead = 0L;
            this.readBuffer.clear();
            bytesRead = ch.read(this.readBuffer);
            this.readBuffer.flip();
            if (bytesRead == -1L) {
                throw new IOException("lost client connection, end of stream");
            }
            if (this.readBuffer.limit() == 0) {
                return;
            }
            CharBuffer cbuf = this.decoder.decode(this.readBuffer);
            String req = cbuf.toString();
            String[] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
            this.requestQueue.addAll(Arrays.asList(lines));
        }

        protected void write(SocketChannel ch) throws IOException {
            Iterator<String> itr = this.replyQueue.iterator();
            while (itr.hasNext()) {
                String reply = itr.next();
                this.writeBuffer.clear();
                logger.debug(MessageFormat.format("fanout reply to {0}: {1}", this.id, reply));
                byte[] bytes = reply.getBytes("ISO-8859-1");
                this.writeBuffer.put(bytes);
                if (bytes[bytes.length - 1] != 10) {
                    this.writeBuffer.put((byte)10);
                }
                this.writeBuffer.flip();
                int written = 0;
                int toWrite = this.writeBuffer.remaining();
                while (written != toWrite) {
                    written += ch.write(this.writeBuffer);
                    try {
                        Thread.sleep(10L);
                    }
                    catch (Exception exception) {}
                }
                itr.remove();
            }
            this.writeBuffer.clear();
        }

        @Override
        protected void reply(String content) throws IOException {
            this.replyQueue.add(content);
        }
    }
}

