/*
 * Decompiled with CFR 0.152.
 */
package com.gitblit.fanout;

import com.gitblit.fanout.FanoutServiceConnection;
import com.gitblit.fanout.FanoutStats;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FanoutService
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(FanoutService.class);
    public static final int DEFAULT_PORT = 17000;
    protected static final int serviceTimeout = 5000;
    protected final String host;
    protected final int port;
    protected final String name;
    private Thread serviceThread;
    private final Map<String, FanoutServiceConnection> connections;
    private final Map<String, Set<FanoutServiceConnection>> subscriptions;
    protected final AtomicBoolean isRunning;
    private final AtomicBoolean strictRequestTermination;
    private final AtomicBoolean allowAllChannelAnnouncements;
    private final AtomicInteger concurrentConnectionLimit;
    private final Date bootDate;
    private final AtomicLong rejectedConnectionCount;
    private final AtomicInteger peakConnectionCount;
    private final AtomicLong totalConnections;
    private final AtomicLong totalAnnouncements;
    private final AtomicLong totalMessages;
    private final AtomicLong totalSubscribes;
    private final AtomicLong totalUnsubscribes;
    private final AtomicLong totalPings;

    protected FanoutService(String host, int port, String name) {
        this.host = host;
        this.port = port;
        this.name = name;
        this.connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
        this.subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
        this.subscriptions.put("all", new ConcurrentSkipListSet());
        this.isRunning = new AtomicBoolean(false);
        this.strictRequestTermination = new AtomicBoolean(false);
        this.allowAllChannelAnnouncements = new AtomicBoolean(false);
        this.concurrentConnectionLimit = new AtomicInteger(0);
        this.bootDate = new Date();
        this.rejectedConnectionCount = new AtomicLong(0L);
        this.peakConnectionCount = new AtomicInteger(0);
        this.totalConnections = new AtomicLong(0L);
        this.totalAnnouncements = new AtomicLong(0L);
        this.totalMessages = new AtomicLong(0L);
        this.totalSubscribes = new AtomicLong(0L);
        this.totalUnsubscribes = new AtomicLong(0L);
        this.totalPings = new AtomicLong(0L);
    }

    protected abstract boolean isConnected();

    protected abstract boolean connect();

    protected abstract void listen() throws IOException;

    protected abstract void disconnect();

    public boolean isStrictRequestTermination() {
        return this.strictRequestTermination.get();
    }

    public void setStrictRequestTermination(boolean isStrictTermination) {
        this.strictRequestTermination.set(isStrictTermination);
    }

    public int getConcurrentConnectionLimit() {
        return this.concurrentConnectionLimit.get();
    }

    public void setConcurrentConnectionLimit(int value) {
        this.concurrentConnectionLimit.set(value);
    }

    public boolean allowAllChannelAnnouncements() {
        return this.allowAllChannelAnnouncements.get();
    }

    public void setAllowAllChannelAnnouncements(boolean value) {
        this.allowAllChannelAnnouncements.set(value);
    }

    public Map<String, FanoutServiceConnection> getCurrentConnections() {
        return this.connections;
    }

    public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
        return this.subscriptions;
    }

    public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) {
        return this.subscriptions.get(channel);
    }

    public FanoutStats getStatistics() {
        FanoutStats stats = new FanoutStats();
        stats.allowAllChannelAnnouncements = this.allowAllChannelAnnouncements();
        stats.concurrentConnectionLimit = this.getConcurrentConnectionLimit();
        stats.strictRequestTermination = this.isStrictRequestTermination();
        stats.bootDate = this.bootDate;
        stats.rejectedConnectionCount = this.rejectedConnectionCount.get();
        stats.peakConnectionCount = this.peakConnectionCount.get();
        stats.totalConnections = this.totalConnections.get();
        stats.totalAnnouncements = this.totalAnnouncements.get();
        stats.totalMessages = this.totalMessages.get();
        stats.totalSubscribes = this.totalSubscribes.get();
        stats.totalUnsubscribes = this.totalUnsubscribes.get();
        stats.totalPings = this.totalPings.get();
        stats.currentConnections = this.connections.size();
        stats.currentChannels = this.subscriptions.size();
        stats.currentSubscriptions = this.subscriptions.size() * this.connections.size();
        return stats;
    }

    public boolean isReady() {
        if (this.isRunning.get()) {
            return this.isConnected();
        }
        return false;
    }

    public void start() {
        if (this.isRunning.get()) {
            logger.warn(MessageFormat.format("{0} is already running", this.name));
            return;
        }
        this.serviceThread = new Thread(this);
        this.serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", this.name, this.host == null ? "all" : this.host, this.port));
        this.serviceThread.start();
    }

    public void startSynchronously() {
        this.start();
        while (!this.isReady()) {
            try {
                Thread.sleep(100L);
            }
            catch (Exception exception) {}
        }
    }

    public void stop() {
        if (!this.isRunning.get()) {
            logger.warn(MessageFormat.format("{0} is not running", this.name));
            return;
        }
        logger.info(MessageFormat.format("stopping {0}...", this.name));
        this.isRunning.set(false);
        try {
            if (this.serviceThread != null) {
                this.serviceThread.join();
                this.serviceThread = null;
            }
        }
        catch (InterruptedException e1) {
            logger.error("", (Throwable)e1);
        }
        logger.info(MessageFormat.format("stopped {0}", this.name));
    }

    @Override
    public final void run() {
        this.disconnect();
        this.resetState();
        this.isRunning.set(true);
        while (this.isRunning.get()) {
            if (this.connect()) {
                try {
                    this.listen();
                }
                catch (IOException e) {
                    logger.error(MessageFormat.format("error processing {0}", this.name), (Throwable)e);
                    this.isRunning.set(false);
                }
                continue;
            }
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        this.disconnect();
        this.resetState();
    }

    protected void resetState() {
        this.connections.clear();
        this.subscriptions.clear();
        this.rejectedConnectionCount.set(0L);
        this.peakConnectionCount.set(0);
        this.totalConnections.set(0L);
        this.totalAnnouncements.set(0L);
        this.totalMessages.set(0L);
        this.totalSubscribes.set(0L);
        this.totalUnsubscribes.set(0L);
        this.totalPings.set(0L);
    }

    protected void configureClientSocket(Socket socket) throws SocketException {
        socket.setKeepAlive(true);
        socket.setSoLinger(true, 0);
    }

    protected boolean addConnection(FanoutServiceConnection connection) {
        int limit = this.getConcurrentConnectionLimit();
        if (limit > 0 && this.connections.size() > limit) {
            logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", this.concurrentConnectionLimit));
            this.increment(this.rejectedConnectionCount);
            connection.busy();
            return false;
        }
        this.connections.put(connection.id, connection);
        if (this.connections.size() > this.peakConnectionCount.get()) {
            this.peakConnectionCount.set(this.connections.size());
        }
        logger.info("fanout new connection " + connection.id);
        connection.connected();
        return true;
    }

    protected void removeConnection(FanoutServiceConnection connection) {
        this.connections.remove(connection.id);
        Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = this.subscriptions.entrySet().iterator();
        while (itr.hasNext()) {
            Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next();
            Set<FanoutServiceConnection> subscriptions = entry.getValue();
            subscriptions.remove(connection);
            if ("all".equals(entry.getKey()) || subscriptions.size() != 0) continue;
            itr.remove();
            logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey()));
        }
        logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
    }

    protected boolean hasConnection(FanoutServiceConnection connection) {
        return this.connections.containsKey(connection.id);
    }

    protected String reply(FanoutServiceConnection connection, String channel, String message) {
        if (channel != null && channel.length() > 0) {
            this.increment(this.totalMessages);
        }
        return connection.reply(channel, message);
    }

    public void broadcastAll(String message) {
        this.broadcast(this.connections.values(), "all", message);
        this.increment(this.totalAnnouncements);
    }

    public void broadcast(String channel, String message) {
        ArrayList<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>((Collection)this.subscriptions.get(channel));
        this.broadcast(connections, channel, message);
        this.increment(this.totalAnnouncements);
    }

    protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
        for (FanoutServiceConnection connection : connections) {
            this.reply(connection, channel, message);
        }
    }

    protected String processRequest(FanoutServiceConnection connection, String req) {
        logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req));
        String[] fields = req.split(" ", 3);
        String action = fields[0];
        String channel = fields.length >= 2 ? fields[1] : null;
        String message = fields.length >= 3 ? fields[2] : null;
        try {
            return this.processRequest(connection, action, channel, message);
        }
        catch (IllegalArgumentException e) {
            logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action));
            logger.error(this.asHexArray(req));
            return null;
        }
    }

    protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException {
        if ("ping".equals(action)) {
            this.increment(this.totalPings);
            return this.reply(connection, null, "" + System.currentTimeMillis());
        }
        if ("info".equals(action)) {
            String info = this.getStatistics().info();
            return this.reply(connection, null, info);
        }
        if ("announce".equals(action)) {
            if (!this.allowAllChannelAnnouncements.get() && "all".equals(channel)) {
                logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message));
            } else if ("debug".equals(channel)) {
                logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message));
            } else {
                ArrayList<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>((Collection)this.subscriptions.get(channel));
                connections.remove(connection);
                this.broadcast(connections, channel, message);
                this.increment(this.totalAnnouncements);
            }
        } else if ("subscribe".equals(action)) {
            if (!this.subscriptions.containsKey(channel)) {
                logger.info(MessageFormat.format("fanout new channel {0}", channel));
                this.subscriptions.put(channel, new ConcurrentSkipListSet());
            }
            this.subscriptions.get(channel).add(connection);
            logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel));
            this.increment(this.totalSubscribes);
        } else if ("unsubscribe".equals(action)) {
            if (this.subscriptions.containsKey(channel)) {
                this.subscriptions.get(channel).remove(connection);
                if (this.subscriptions.get(channel).size() == 0) {
                    this.subscriptions.remove(channel);
                }
                this.increment(this.totalUnsubscribes);
            }
        } else {
            throw new IllegalArgumentException(action);
        }
        return null;
    }

    private String asHexArray(String req) {
        StringBuilder sb = new StringBuilder();
        for (char c : req.toCharArray()) {
            sb.append(Integer.toHexString(c)).append(' ');
        }
        return "[ " + sb.toString().trim() + " ]";
    }

    private void increment(AtomicLong counter) {
        long v = counter.incrementAndGet();
        if (v < 0L) {
            counter.set(0L);
        }
    }

    public String toString() {
        return this.name;
    }
}

