package reactor.net.netty.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.composable.spec.Streams;
import reactor.function.Consumer;
import reactor.function.Supplier;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.AbstractNetChannel;
import reactor.net.NetChannel;
import reactor.net.Reconnect;
import reactor.net.config.ClientSocketOptions;
import reactor.net.config.SslOptions;
import reactor.net.netty.NettyClientSocketOptions;
import reactor.net.netty.NettyEventLoopDispatcher;
import reactor.net.netty.NettyNetChannel;
import reactor.net.netty.NettyNetChannelInboundHandler;
import reactor.net.netty.NettyNetChannelOutboundHandler;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.ssl.SSLEngineSupplier;
import reactor.support.NamedDaemonThreadFactory;
import reactor.tuple.Tuple2;

/* loaded from: input_file:reactor/net/netty/tcp/NettyTcpClient.class */
public class NettyTcpClient<IN, OUT> extends TcpClient<IN, OUT> {
    private final Logger log;
    private final Bootstrap bootstrap;
    private final EventLoopGroup ioGroup;
    private final Supplier<ChannelFuture> connectionSupplier;
    private volatile InetSocketAddress connectAddress;
    private volatile boolean closing;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/net/netty/tcp/NettyTcpClient$ConnectingChannelListener.class */
    public class ConnectingChannelListener implements ChannelFutureListener {
        private final Deferred<NetChannel<IN, OUT>, Promise<NetChannel<IN, OUT>>> connection;

        private ConnectingChannelListener(Deferred<NetChannel<IN, OUT>, Promise<NetChannel<IN, OUT>>> deferred) {
            this.connection = deferred;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (!channelFuture.isSuccess()) {
                if (NettyTcpClient.this.log.isErrorEnabled()) {
                    NettyTcpClient.this.log.error(channelFuture.cause().getMessage(), channelFuture.cause());
                }
                this.connection.accept(channelFuture.cause());
            } else {
                if (NettyTcpClient.this.log.isInfoEnabled()) {
                    NettyTcpClient.this.log.info("CONNECTED: " + channelFuture.channel());
                }
                final AbstractNetChannel netChannel = channelFuture.channel().pipeline().get(NettyNetChannelInboundHandler.class).getNetChannel();
                channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { // from class: reactor.net.netty.tcp.NettyTcpClient.ConnectingChannelListener.1
                    public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                        if (NettyTcpClient.this.log.isInfoEnabled()) {
                            NettyTcpClient.this.log.info("CLOSED: " + channelFuture2.channel());
                        }
                        NettyTcpClient.this.notifyClose(netChannel);
                    }
                });
                channelFuture.channel().eventLoop().submit(new Runnable() { // from class: reactor.net.netty.tcp.NettyTcpClient.ConnectingChannelListener.2
                    @Override // java.lang.Runnable
                    public void run() {
                        ConnectingChannelListener.this.connection.accept(netChannel);
                    }
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/net/netty/tcp/NettyTcpClient$ReconnectingChannelListener.class */
    public class ReconnectingChannelListener implements ChannelFutureListener {
        private final AtomicInteger attempts;
        private final Reconnect reconnect;
        private final Deferred<NetChannel<IN, OUT>, Stream<NetChannel<IN, OUT>>> connections;
        private volatile InetSocketAddress connectAddress;

        private ReconnectingChannelListener(InetSocketAddress inetSocketAddress, Reconnect reconnect, Deferred<NetChannel<IN, OUT>, Stream<NetChannel<IN, OUT>>> deferred) {
            this.attempts = new AtomicInteger(0);
            this.connectAddress = inetSocketAddress;
            this.reconnect = reconnect;
            this.connections = deferred;
        }

        public void operationComplete(final ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                if (NettyTcpClient.this.log.isInfoEnabled()) {
                    NettyTcpClient.this.log.info("CONNECTED: " + channelFuture.channel());
                }
                final Channel channel = channelFuture.channel();
                ChannelPipeline pipeline = channel.pipeline();
                final AbstractNetChannel netChannel = pipeline.get(NettyNetChannelInboundHandler.class).getNetChannel();
                pipeline.addLast(new ChannelHandler[]{new ChannelDuplexHandler() { // from class: reactor.net.netty.tcp.NettyTcpClient.ReconnectingChannelListener.2
                    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                        if (NettyTcpClient.this.log.isInfoEnabled()) {
                            NettyTcpClient.this.log.info("CLOSED: " + channel);
                        }
                        NettyTcpClient.this.notifyClose(netChannel);
                        Tuple2<InetSocketAddress, Long> reconnect = ReconnectingChannelListener.this.reconnect.reconnect(ReconnectingChannelListener.this.connectAddress, ReconnectingChannelListener.this.attempts.incrementAndGet());
                        if (null == reconnect) {
                            return;
                        }
                        if (((NettyNetChannel) netChannel).isClosing()) {
                            NettyTcpClient.this.closing = true;
                        } else {
                            ReconnectingChannelListener.this.attemptReconnect(reconnect);
                        }
                        super.channelInactive(channelHandlerContext);
                    }
                }});
                channel.eventLoop().submit(new Runnable() { // from class: reactor.net.netty.tcp.NettyTcpClient.ReconnectingChannelListener.3
                    @Override // java.lang.Runnable
                    public void run() {
                        ReconnectingChannelListener.this.connections.accept(netChannel);
                    }
                });
                return;
            }
            int incrementAndGet = this.attempts.incrementAndGet();
            Tuple2<InetSocketAddress, Long> reconnect = this.reconnect.reconnect(this.connectAddress, incrementAndGet);
            if (null != reconnect) {
                attemptReconnect(reconnect);
                return;
            }
            if (NettyTcpClient.this.log.isErrorEnabled()) {
                NettyTcpClient.this.log.error("Reconnection to {} failed after {} attempts. Giving up.", this.connectAddress, Integer.valueOf(incrementAndGet - 1));
            }
            channelFuture.channel().eventLoop().submit(new Runnable() { // from class: reactor.net.netty.tcp.NettyTcpClient.ReconnectingChannelListener.1
                @Override // java.lang.Runnable
                public void run() {
                    ReconnectingChannelListener.this.connections.accept(channelFuture.cause());
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void attemptReconnect(Tuple2<InetSocketAddress, Long> tuple2) {
            this.connectAddress = (InetSocketAddress) tuple2.getT1();
            NettyTcpClient.this.bootstrap.remoteAddress(this.connectAddress);
            long longValue = ((Long) tuple2.getT2()).longValue();
            if (NettyTcpClient.this.log.isInfoEnabled()) {
                NettyTcpClient.this.log.info("Failed to connect to {}. Attempting reconnect in {}ms.", this.connectAddress, Long.valueOf(longValue));
            }
            NettyTcpClient.this.getEnvironment().getRootTimer().submit(new Consumer<Long>() { // from class: reactor.net.netty.tcp.NettyTcpClient.ReconnectingChannelListener.4
                public void accept(Long l) {
                    NettyTcpClient.this.openChannel(ReconnectingChannelListener.this);
                }
            }, longValue, TimeUnit.MILLISECONDS).cancelAfterUse();
        }
    }

    public NettyTcpClient(@Nonnull Environment environment, @Nonnull Reactor reactor2, @Nonnull InetSocketAddress inetSocketAddress, @Nonnull final ClientSocketOptions clientSocketOptions, @Nullable final SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec, @Nonnull Collection<Consumer<NetChannel<IN, OUT>>> collection) {
        super(environment, reactor2, inetSocketAddress, clientSocketOptions, sslOptions, codec, collection);
        this.log = LoggerFactory.getLogger(NettyTcpClient.class);
        this.connectAddress = inetSocketAddress;
        this.ioGroup = new NioEventLoopGroup(((Integer) environment.getProperty("reactor.tcp.ioThreadCount", Integer.class, Integer.valueOf(Environment.PROCESSORS))).intValue(), new NamedDaemonThreadFactory("reactor-tcp-io"));
        this.bootstrap = new Bootstrap().group(this.ioGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_RCVBUF, Integer.valueOf(clientSocketOptions.rcvbuf())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(clientSocketOptions.sndbuf())).option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(clientSocketOptions.keepAlive())).option(ChannelOption.SO_LINGER, Integer.valueOf(clientSocketOptions.linger())).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(clientSocketOptions.tcpNoDelay())).remoteAddress(this.connectAddress).handler(new ChannelInitializer<SocketChannel>() { // from class: reactor.net.netty.tcp.NettyTcpClient.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.config().setConnectTimeoutMillis(clientSocketOptions.timeout());
                if (null != sslOptions) {
                    SSLEngine m5get = new SSLEngineSupplier(sslOptions, true).m5get();
                    if (NettyTcpClient.this.log.isDebugEnabled()) {
                        NettyTcpClient.this.log.debug("SSL enabled using keystore {}", null != sslOptions.keystoreFile() ? sslOptions.keystoreFile() : "<DEFAULT>");
                    }
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new SslHandler(m5get)});
                }
                if ((clientSocketOptions instanceof NettyClientSocketOptions) && null != ((NettyClientSocketOptions) clientSocketOptions).pipelineConfigurer()) {
                    ((NettyClientSocketOptions) clientSocketOptions).pipelineConfigurer().accept(socketChannel.pipeline());
                }
                socketChannel.pipeline().addLast(NettyTcpClient.this.createChannelHandlers(socketChannel));
            }
        });
        this.connectionSupplier = new Supplier<ChannelFuture>() { // from class: reactor.net.netty.tcp.NettyTcpClient.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public ChannelFuture m1get() {
                if (NettyTcpClient.this.closing) {
                    return null;
                }
                return NettyTcpClient.this.bootstrap.connect(NettyTcpClient.this.getConnectAddress());
            }
        };
    }

    @Override // reactor.net.tcp.TcpClient, reactor.net.NetClient
    public Promise<NetChannel<IN, OUT>> open() {
        Deferred defer = Promises.defer(getEnvironment(), getReactor().getDispatcher());
        openChannel(new ConnectingChannelListener(defer));
        return defer.compose();
    }

    @Override // reactor.net.tcp.TcpClient, reactor.net.NetClient
    public Stream<NetChannel<IN, OUT>> open(Reconnect reconnect) {
        Deferred defer = Streams.defer(getEnvironment(), getReactor().getDispatcher());
        openChannel(new ReconnectingChannelListener(this.connectAddress, reconnect, defer));
        return defer.compose();
    }

    @Override // reactor.net.AbstractNetPeer, reactor.net.NetClient
    public void close(@Nullable final Consumer<Boolean> consumer) {
        this.ioGroup.shutdownGracefully().addListener(new FutureListener<Object>() { // from class: reactor.net.netty.tcp.NettyTcpClient.3
            public void operationComplete(Future<Object> future) throws Exception {
                if (null != consumer) {
                    consumer.accept(Boolean.valueOf(future.isDone() && future.isSuccess()));
                }
            }
        });
    }

    @Override // reactor.net.AbstractNetPeer
    protected <C> NetChannel<IN, OUT> createChannel(C c) {
        SocketChannel socketChannel = (SocketChannel) c;
        return new NettyNetChannel(getEnvironment(), getCodec(), new NettyEventLoopDispatcher(socketChannel.eventLoop(), ((Integer) getEnvironment().getProperty("reactor.tcp.connectionReactorBacklog", Integer.class, 128)).intValue()), getReactor(), socketChannel);
    }

    protected ChannelHandler[] createChannelHandlers(SocketChannel socketChannel) {
        return new ChannelHandler[]{new NettyNetChannelInboundHandler().setNetChannel((NettyNetChannel) createChannel(socketChannel)), new NettyNetChannelOutboundHandler()};
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void openChannel(ChannelFutureListener channelFutureListener) {
        ChannelFuture channelFuture = (ChannelFuture) this.connectionSupplier.get();
        if (null == channelFuture || null == channelFutureListener) {
            return;
        }
        channelFuture.addListener(channelFutureListener);
    }
}
