package net.dongliu.prettypb.rpc.client;

import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import net.dongliu.prettypb.rpc.common.PeerInfo;
import net.dongliu.prettypb.rpc.exception.ServiceException;
import net.dongliu.prettypb.rpc.listener.TcpConnectionEventListener;
import net.dongliu.prettypb.rpc.protocol.ConnectRequest;
import net.dongliu.prettypb.rpc.protocol.ConnectResponse;
import net.dongliu.prettypb.rpc.protocol.RpcRequest;
import net.dongliu.prettypb.rpc.protocol.ServerInfo;
import net.dongliu.prettypb.rpc.protocol.WirePayload;
import net.dongliu.prettypb.rpc.utils.Handlers;
import net.dongliu.prettypb.runtime.ExtensionRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/dongliu/prettypb/rpc/client/RpcClientChannel.class */
public class RpcClientChannel implements AutoCloseable {
    private volatile PeerInfo clientPeer;
    private volatile PeerInfo serverPeer;
    private volatile boolean useCompress;
    private Channel channel;
    private final ServerInfo serverInfo;
    private final Bootstrap bootstrap;
    private final int connectCorrelationId;
    private final boolean tryUseCompress;
    private final int connectTimeout;
    private final List<TcpConnectionEventListener> listeners;
    private static Logger logger = LoggerFactory.getLogger(RpcClientChannel.class);
    private final ExtensionRegistry extensionRegistry;
    private RpcClientHandler rpcClientHandler;

    public RpcClientChannel(ServerInfo serverInfo, Bootstrap bootstrap, int i, boolean z, int i2, List<TcpConnectionEventListener> list, ExtensionRegistry extensionRegistry) {
        this.serverInfo = serverInfo;
        this.bootstrap = bootstrap;
        this.connectCorrelationId = i;
        this.tryUseCompress = z;
        this.connectTimeout = i2;
        this.listeners = list;
        this.extensionRegistry = extensionRegistry;
    }

    public void connect() throws IOException, TimeoutException, InterruptedException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(this.serverInfo.getHost(), this.serverInfo.getPort());
        ChannelFuture awaitUninterruptibly = this.bootstrap.connect(inetSocketAddress).awaitUninterruptibly();
        if (!awaitUninterruptibly.isSuccess()) {
            throw new IOException("Failed to connect to " + inetSocketAddress, awaitUninterruptibly.cause());
        }
        this.channel = awaitUninterruptibly.channel();
        InetSocketAddress inetSocketAddress2 = (InetSocketAddress) this.channel.localAddress();
        this.clientPeer = new PeerInfo(inetSocketAddress2.getHostName(), inetSocketAddress2.getPort());
        sendConnectMsg();
        ClientConnectResponseHandler clientConnectResponseHandler = this.channel.pipeline().get(Handlers.CLIENT_CONNECT);
        if (clientConnectResponseHandler == null) {
            throw new IllegalStateException("No connectResponse handler in channel pipeline.");
        }
        ConnectResponse waitResponse = clientConnectResponseHandler.waitResponse(this.connectTimeout);
        checkConnectResponse(this.channel, this.connectCorrelationId, waitResponse);
        if (waitResponse.hasServerPID()) {
            this.serverPeer = new PeerInfo(inetSocketAddress.getHostName(), inetSocketAddress.getPort(), waitResponse.getServerPID());
        } else {
            this.serverPeer = new PeerInfo(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
        }
        this.useCompress = waitResponse.isCompress();
        logger.info("connect to {}:{}, use compress: {}", new Object[]{this.serverInfo.getHost(), Integer.valueOf(this.serverInfo.getPort()), Boolean.valueOf(this.useCompress)});
        this.rpcClientHandler = completePipeline(this.useCompress, this.channel.pipeline());
        this.rpcClientHandler.notifyOpened();
    }

    private void sendConnectMsg() {
        ConnectRequest connectRequest = new ConnectRequest();
        connectRequest.setClientHostName(this.clientPeer.getHostName());
        connectRequest.setClientPort(this.clientPeer.getPort());
        connectRequest.setClientPID(this.clientPeer.getPid());
        connectRequest.setCorrelationId(this.connectCorrelationId);
        connectRequest.setCompress(this.tryUseCompress);
        WirePayload wirePayload = new WirePayload();
        wirePayload.setConnectRequest(connectRequest);
        this.channel.writeAndFlush(wirePayload);
    }

    private void checkConnectResponse(Channel channel, int i, ConnectResponse connectResponse) throws IOException, TimeoutException {
        if (connectResponse == null) {
            channel.close().awaitUninterruptibly();
            throw new TimeoutException("connect to rpc server error, response is null");
        }
        if (connectResponse.hasErrorCode()) {
            channel.close().awaitUninterruptibly();
            throw new IOException("connect response error: " + connectResponse.getErrorCode());
        }
        if (!connectResponse.hasCorrelationId()) {
            channel.close().awaitUninterruptibly();
            throw new IOException("connect response missing connectCorrelationId.");
        }
        if (connectResponse.getCorrelationId() != i) {
            channel.close().awaitUninterruptibly();
            throw new IOException("Connect CorrelationId mismatch, sent " + i + " received " + connectResponse.getCorrelationId());
        }
    }

    private RpcClientHandler completePipeline(boolean z, ChannelPipeline channelPipeline) {
        if (z) {
            channelPipeline.addBefore(Handlers.FRAME_DECODER, Handlers.COMPRESSOR, ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
            channelPipeline.addAfter(Handlers.COMPRESSOR, Handlers.DECOMPRESSOR, ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
        }
        RpcClientHandler rpcClientHandler = new RpcClientHandler(new TcpConnectionEventListener() { // from class: net.dongliu.prettypb.rpc.client.RpcClientChannel.1
            @Override // net.dongliu.prettypb.rpc.listener.TcpConnectionEventListener
            public void connectionClosed(RpcClientChannel rpcClientChannel) {
                Iterator it = RpcClientChannel.this.listeners.iterator();
                while (it.hasNext()) {
                    ((TcpConnectionEventListener) it.next()).connectionClosed(rpcClientChannel);
                }
            }

            @Override // net.dongliu.prettypb.rpc.listener.TcpConnectionEventListener
            public void connectionOpened(RpcClientChannel rpcClientChannel) {
                Iterator it = RpcClientChannel.this.listeners.iterator();
                while (it.hasNext()) {
                    ((TcpConnectionEventListener) it.next()).connectionOpened(rpcClientChannel);
                }
            }
        }, this, this.extensionRegistry);
        channelPipeline.replace(Handlers.CLIENT_CONNECT, Handlers.RPC_CLIENT, rpcClientHandler);
        return rpcClientHandler;
    }

    public ListenableFuture sendRequest(ClientCallTask clientCallTask, RpcRequest rpcRequest) {
        WirePayload wirePayload = new WirePayload();
        wirePayload.setRpcRequest(rpcRequest);
        if (!this.rpcClientHandler.registerTask(clientCallTask)) {
            throw new ServiceException("duplicated correlation id, or task set closed");
        }
        this.channel.writeAndFlush(wirePayload);
        return clientCallTask.future();
    }

    public PeerInfo getServerPeer() {
        return this.serverPeer;
    }

    public PeerInfo getClientPeer() {
        return this.clientPeer;
    }

    public boolean isUseCompress() {
        return this.useCompress;
    }

    public ServerInfo getServerInfo() {
        return this.serverInfo;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.channel != null) {
            this.channel.close().awaitUninterruptibly();
        }
    }
}
