package io.r2dbc.postgresql.client;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.postgresql.api.ErrorDetails;
import io.r2dbc.postgresql.api.PostgresqlException;
import io.r2dbc.postgresql.message.backend.BackendKeyData;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.BackendMessageDecoder;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.Field;
import io.r2dbc.postgresql.message.backend.NoticeResponse;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.message.backend.ParameterStatus;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Terminate;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.R2dbcTransientResourceException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.StringJoiner;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.net.ssl.SSLException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.boot.autoconfigure.security.SecurityProperties;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.channel.AbortedException;
import reactor.netty.tcp.TcpClient;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-1.0.0.RELEASE.jar:io/r2dbc/postgresql/client/ReactorNettyClient.class */
public final class ReactorNettyClient implements Client {
    static final String CONNECTION_FAILURE = "08006";
    private static final Logger logger = Loggers.getLogger((Class<?>) ReactorNettyClient.class);
    private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
    private static final Supplier<PostgresConnectionClosedException> UNEXPECTED = () -> {
        return new PostgresConnectionClosedException("Connection unexpectedly closed");
    };
    private static final Supplier<PostgresConnectionClosedException> EXPECTED = () -> {
        return new PostgresConnectionClosedException("Connection closed");
    };
    private final ByteBufAllocator byteBufAllocator;
    private final ConnectionSettings settings;
    private final Connection connection;
    private ConnectionContext context;
    private volatile Integer processId;
    private volatile Integer secretKey;
    private volatile TimeZone timeZone;
    private final Sinks.Many<Publisher<FrontendMessage>> requestSink = Sinks.many().unicast().onBackpressureBuffer();
    private final Sinks.Many<NotificationResponse> notificationProcessor = Sinks.many().multicast().directBestEffort();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final BackendMessageSubscriber messageSubscriber = new BackendMessageSubscriber();
    private volatile TransactionStatus transactionStatus = TransactionStatus.IDLE;
    private volatile Version version = new Version("", 0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-1.0.0.RELEASE.jar:io/r2dbc/postgresql/client/ReactorNettyClient$BackendMessageSubscriber.class */
    public class BackendMessageSubscriber implements CoreSubscriber<BackendMessage> {
        private static final int DEMAND = 256;
        private final Queue<Conversation> conversations;
        private final Queue<BackendMessage> buffer;
        private final AtomicLong demand;
        private final AtomicBoolean drain;
        private volatile boolean terminated;
        private Subscription upstream;

        private BackendMessageSubscriber() {
            this.conversations = (Queue) Queues.small().get();
            this.buffer = (Queue) Queues.get(256).get();
            this.demand = new AtomicLong(0L);
            this.drain = new AtomicBoolean();
        }

        public Flux<BackendMessage> addConversation(Predicate<BackendMessage> predicate, Publisher<FrontendMessage> publisher, Consumer<Publisher<FrontendMessage>> consumer, Supplier<Boolean> supplier) {
            return Flux.create(fluxSink -> {
                Conversation conversation = new Conversation(predicate, fluxSink);
                synchronized (this.conversations) {
                    if (this.conversations.offer(conversation)) {
                        fluxSink.onRequest(j -> {
                            onRequest(conversation, j);
                        });
                        if (!((Boolean) supplier.get()).booleanValue()) {
                            fluxSink.error(createClientClosedException());
                            return;
                        }
                        consumer.accept(publisher);
                    } else {
                        fluxSink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));
                    }
                }
            });
        }

        PostgresConnectionClosedException createClientClosedException() {
            return createClientClosedException(null);
        }

        PostgresConnectionClosedException createClientClosedException(@Nullable Throwable th) {
            return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed", th);
        }

        public void onRequest(Conversation conversation, long j) {
            conversation.incrementDemand(j);
            demandMore();
            tryDrainLoop();
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.upstream = subscription;
            demandMore();
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(BackendMessage backendMessage) {
            Conversation peek;
            if (this.terminated) {
                ReferenceCountUtil.release(backendMessage);
                Operators.onNextDropped(backendMessage, currentContext());
                return;
            }
            this.demand.decrementAndGet();
            if (this.buffer.isEmpty() && (peek = this.conversations.peek()) != null && peek.hasDemand()) {
                emit(peek, backendMessage);
                potentiallyDemandMore(peek);
            } else {
                if (this.buffer.offer(backendMessage)) {
                    tryDrainLoop();
                    return;
                }
                ReferenceCountUtil.release(backendMessage);
                Operators.onNextDropped(backendMessage, currentContext());
                onError(new ResponseQueueException("Response queue is full"));
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (this.terminated) {
                Operators.onErrorDropped(th, currentContext());
                return;
            }
            ReactorNettyClient.this.handleConnectionError(th);
            ReactorNettyClient.this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            this.terminated = true;
            if (ReactorNettyClient.isSslException(th)) {
                ReactorNettyClient.logger.debug(ReactorNettyClient.this.context.getMessage("Connection Error"), th);
            } else {
                ReactorNettyClient.logger.error(ReactorNettyClient.this.context.getMessage("Connection Error"), th);
            }
            ReactorNettyClient.this.close().subscribe();
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.terminated = true;
            ReactorNettyClient.this.handleClose();
        }

        @Override // reactor.core.CoreSubscriber
        public Context currentContext() {
            Conversation peek = this.conversations.peek();
            return peek != null ? Context.of(peek.sink.contextView()) : Context.empty();
        }

        private void tryDrainLoop() {
            while (hasBufferedItems() && hasDownstreamDemand() && drainLoop()) {
            }
        }

        private boolean drainLoop() {
            BackendMessage poll;
            if (!this.drain.compareAndSet(false, true)) {
                return false;
            }
            Conversation conversation = null;
            while (hasBufferedItems()) {
                try {
                    Conversation peek = this.conversations.peek();
                    conversation = peek;
                    if (peek == null || !peek.hasDemand() || (poll = this.buffer.poll()) == null) {
                        break;
                    }
                    emit(peek, poll);
                } finally {
                    this.drain.compareAndSet(true, false);
                }
            }
            potentiallyDemandMore(conversation);
            return true;
        }

        private void potentiallyDemandMore(@Nullable Conversation conversation) {
            if (conversation == null || conversation.hasDemand() || conversation.isCancelled()) {
                demandMore();
            }
        }

        private void emit(Conversation conversation, BackendMessage backendMessage) {
            if (!conversation.canComplete(backendMessage)) {
                conversation.emit(backendMessage);
            } else {
                this.conversations.poll();
                conversation.complete(backendMessage);
            }
        }

        private void demandMore() {
            if (hasBufferedItems() || !this.demand.compareAndSet(0L, 256L)) {
                return;
            }
            this.upstream.request(256L);
        }

        private boolean hasDownstreamDemand() {
            Conversation peek = this.conversations.peek();
            return peek != null && peek.hasDemand();
        }

        private boolean hasBufferedItems() {
            return !this.buffer.isEmpty();
        }

        public void close(Supplier<? extends Throwable> supplier) {
            this.terminated = true;
            while (true) {
                Conversation poll = this.conversations.poll();
                if (poll == null) {
                    break;
                } else {
                    poll.onError(supplier.get());
                }
            }
            while (!this.buffer.isEmpty()) {
                ReferenceCountUtil.release(this.buffer.poll());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-1.0.0.RELEASE.jar:io/r2dbc/postgresql/client/ReactorNettyClient$Conversation.class */
    public static class Conversation {
        private static final AtomicLongFieldUpdater<Conversation> DEMAND_UPDATER = AtomicLongFieldUpdater.newUpdater(Conversation.class, "demand");
        private final Predicate<BackendMessage> takeUntil;
        private final FluxSink<BackendMessage> sink;
        private volatile long demand;

        private Conversation(Predicate<BackendMessage> predicate, FluxSink<BackendMessage> fluxSink) {
            this.sink = fluxSink;
            this.takeUntil = predicate;
        }

        private void decrementDemand() {
            Operators.addCap(DEMAND_UPDATER, this, -1L);
        }

        public boolean canComplete(BackendMessage backendMessage) {
            return this.takeUntil.test(backendMessage);
        }

        public void complete(BackendMessage backendMessage) {
            ReferenceCountUtil.release(backendMessage);
            if (this.sink.isCancelled()) {
                return;
            }
            this.sink.complete();
        }

        public void emit(BackendMessage backendMessage) {
            if (this.sink.isCancelled()) {
                ReferenceCountUtil.release(backendMessage);
            }
            decrementDemand();
            this.sink.next(backendMessage);
        }

        public void onError(Throwable th) {
            if (this.sink.isCancelled()) {
                return;
            }
            this.sink.error(th);
        }

        public boolean hasDemand() {
            return DEMAND_UPDATER.get(this) > 0;
        }

        public boolean isCancelled() {
            return this.sink.isCancelled();
        }

        public void incrementDemand(long j) {
            Operators.addCap(DEMAND_UPDATER, this, j);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-1.0.0.RELEASE.jar:io/r2dbc/postgresql/client/ReactorNettyClient$EnsureSubscribersCompleteChannelHandler.class */
    private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {
        private final Sinks.Many<?> requestSink;

        private EnsureSubscribersCompleteChannelHandler(Sinks.Many<?> many) {
            this.requestSink = many;
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelInactive(channelHandlerContext);
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelUnregistered(channelHandlerContext);
            this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            ReactorNettyClient.this.handleClose();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-1.0.0.RELEASE.jar:io/r2dbc/postgresql/client/ReactorNettyClient$PostgresConnectionClosedException.class */
    public static class PostgresConnectionClosedException extends R2dbcNonTransientResourceException implements PostgresqlException {
        private final ErrorDetails errorDetails;

        public PostgresConnectionClosedException(String str) {
            super(str, ReactorNettyClient.CONNECTION_FAILURE, 0, (String) null);
            this.errorDetails = ErrorDetails.fromCodeAndMessage(ReactorNettyClient.CONNECTION_FAILURE, str);
        }

        public PostgresConnectionClosedException(String str, @Nullable Throwable th) {
            super(str, ReactorNettyClient.CONNECTION_FAILURE, 0, null, th);
            this.errorDetails = ErrorDetails.fromCodeAndMessage(ReactorNettyClient.CONNECTION_FAILURE, str);
        }

        @Override // io.r2dbc.postgresql.api.PostgresqlException
        public ErrorDetails getErrorDetails() {
            return this.errorDetails;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-1.0.0.RELEASE.jar:io/r2dbc/postgresql/client/ReactorNettyClient$PostgresConnectionException.class */
    public static class PostgresConnectionException extends R2dbcNonTransientResourceException implements PostgresqlException {
        private static final ErrorDetails ERROR_DETAILS = ErrorDetails.fromCodeAndMessage(ReactorNettyClient.CONNECTION_FAILURE, "An I/O error occurred while sending to the backend or receiving from the backend");

        public PostgresConnectionException(Throwable th) {
            super(ERROR_DETAILS.getMessage(), ERROR_DETAILS.getCode(), 0, null, th);
        }

        @Override // io.r2dbc.postgresql.api.PostgresqlException
        public ErrorDetails getErrorDetails() {
            return ERROR_DETAILS;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-1.0.0.RELEASE.jar:io/r2dbc/postgresql/client/ReactorNettyClient$RequestQueueException.class */
    static class RequestQueueException extends R2dbcTransientResourceException implements PostgresqlException {
        private final ErrorDetails errorDetails;

        public RequestQueueException(String str) {
            super(str, ReactorNettyClient.CONNECTION_FAILURE, 0, (String) null);
            this.errorDetails = ErrorDetails.fromCodeAndMessage(ReactorNettyClient.CONNECTION_FAILURE, str);
        }

        @Override // io.r2dbc.postgresql.api.PostgresqlException
        public ErrorDetails getErrorDetails() {
            return this.errorDetails;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-1.0.0.RELEASE.jar:io/r2dbc/postgresql/client/ReactorNettyClient$ResponseQueueException.class */
    public static class ResponseQueueException extends R2dbcNonTransientResourceException implements PostgresqlException {
        private final ErrorDetails errorDetails;

        public ResponseQueueException(String str) {
            super(str, ReactorNettyClient.CONNECTION_FAILURE, 0, (String) null);
            this.errorDetails = ErrorDetails.fromCodeAndMessage(ReactorNettyClient.CONNECTION_FAILURE, str);
        }

        @Override // io.r2dbc.postgresql.api.PostgresqlException
        public ErrorDetails getErrorDetails() {
            return this.errorDetails;
        }
    }

    private ReactorNettyClient(Connection connection, ConnectionSettings connectionSettings) {
        Assert.requireNonNull(connection, "Connection must not be null");
        this.settings = (ConnectionSettings) Assert.requireNonNull(connectionSettings, "ConnectionSettings must not be null");
        connection.addHandlerFirst(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
        connection.addHandlerLast(new LengthFieldBasedFrameDecoder(SecurityProperties.BASIC_AUTH_ORDER, 1, 4, -4, 0));
        this.connection = connection;
        this.byteBufAllocator = connection.outbound().alloc();
        this.context = new ConnectionContext().withChannelId(connection.channel().toString());
        AtomicReference atomicReference = new AtomicReference();
        Flux<V> map = connection.inbound().receive().map(BackendMessageDecoder::decode);
        atomicReference.getClass();
        map.doOnError((v1) -> {
            r1.set(v1);
        }).handle((backendMessage, synchronousSink) -> {
            if (consumeMessage(backendMessage)) {
                return;
            }
            synchronousSink.next(backendMessage);
        }).subscribe((CoreSubscriber) this.messageSubscriber);
        this.requestSink.asFlux().concatMap(Function.identity()).flatMap(frontendMessage -> {
            if (DEBUG_ENABLED) {
                logger.debug(this.context.getMessage(String.format("Request:  %s", frontendMessage)));
            }
            return connection.outbound().send(frontendMessage.encode(this.byteBufAllocator));
        }, 1).then().onErrorResume(this::resumeError).doAfterTerminate(this::handleClose).subscribe();
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Mono<Void> close() {
        return Mono.defer(() -> {
            this.notificationProcessor.tryEmitComplete();
            drainError(EXPECTED);
            return this.isClosed.compareAndSet(false, true) ? (!isConnected() || this.processId == null) ? closeConnection() : Flux.just(Terminate.INSTANCE).doOnNext(terminate -> {
                logger.debug(this.context.getMessage(String.format("Request:  %s", terminate)));
            }).concatMap(terminate2 -> {
                return this.connection.outbound().send(terminate2.encode(this.connection.outbound().alloc()));
            }).then().doOnSuccess(r3 -> {
                this.connection.dispose();
            }).then(this.connection.onDispose()) : Mono.empty();
        });
    }

    private Mono<? extends Void> closeConnection() {
        this.connection.dispose();
        return this.connection.onDispose();
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Flux<BackendMessage> exchange(Predicate<BackendMessage> predicate, Publisher<FrontendMessage> publisher) {
        Assert.requireNonNull(predicate, "takeUntil must not be null");
        Assert.requireNonNull(publisher, "requests must not be null");
        return !isConnected() ? Flux.error(this.messageSubscriber.createClientClosedException()) : this.messageSubscriber.addConversation(predicate, publisher, this::doSendRequest, this::isConnected);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public void send(FrontendMessage frontendMessage) {
        Assert.requireNonNull(frontendMessage, "requests must not be null");
        doSendRequest(Mono.just(frontendMessage));
    }

    private void doSendRequest(Publisher<FrontendMessage> publisher) {
        this.requestSink.emitNext(publisher, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    private Mono<Void> resumeError(Throwable th) {
        handleConnectionError(th);
        this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        if (isSslException(th)) {
            logger.debug(this.context.getMessage("Connection Error"), th);
        } else {
            logger.warn(this.context.getMessage("Connection Error"), th);
        }
        return close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isSslException(Throwable th) {
        return (th instanceof SSLException) || (th.getCause() instanceof SSLException);
    }

    private boolean consumeMessage(BackendMessage backendMessage) {
        if (DEBUG_ENABLED) {
            logger.debug(this.context.getMessage(String.format("Response: %s", backendMessage)));
        }
        if (backendMessage.getClass() == NoticeResponse.class) {
            this.settings.getNoticeLogLevel().log(logger, () -> {
                return this.context.getMessage(String.format("Notice: %s", toString(((NoticeResponse) backendMessage).getFields())));
            });
            return true;
        }
        if (backendMessage.getClass() == BackendKeyData.class) {
            BackendKeyData backendKeyData = (BackendKeyData) backendMessage;
            this.processId = Integer.valueOf(backendKeyData.getProcessId());
            this.context = this.context.withProcessId(this.processId.intValue());
            this.secretKey = Integer.valueOf(backendKeyData.getSecretKey());
            return true;
        }
        if (backendMessage.getClass() == ErrorResponse.class) {
            this.settings.getErrorResponseLogLevel().log(logger, () -> {
                return String.format("Error: %s", toString(((ErrorResponse) backendMessage).getFields()));
            });
        }
        if (backendMessage.getClass() == ParameterStatus.class) {
            handleParameterStatus((ParameterStatus) backendMessage);
        }
        if (backendMessage.getClass() == ReadyForQuery.class) {
            this.transactionStatus = TransactionStatus.valueOf(((ReadyForQuery) backendMessage).getTransactionStatus());
        }
        if (backendMessage.getClass() != NotificationResponse.class) {
            return false;
        }
        this.notificationProcessor.tryEmitNext((NotificationResponse) backendMessage);
        return true;
    }

    private void handleParameterStatus(ParameterStatus parameterStatus) {
        String name2 = parameterStatus.getName();
        if (name2.equals("server_version_num") || name2.equals("server_version")) {
            Version version = this.version;
            String version2 = version.getVersion();
            int versionNumber = version.getVersionNumber();
            if (name2.equals("server_version_num")) {
                versionNumber = Integer.parseInt(parameterStatus.getValue());
            }
            if (name2.equals("server_version")) {
                version2 = parameterStatus.getValue();
                if (versionNumber == 0) {
                    versionNumber = Version.parseServerVersionStr(version2);
                }
            }
            this.version = new Version(version2, versionNumber);
        }
        if (name2.equals("TimeZone")) {
            this.timeZone = TimeZoneUtils.parseBackendTimeZone(parameterStatus.getValue());
        }
    }

    public static Mono<ReactorNettyClient> connect(String str, int i) {
        Assert.requireNonNull(str, "host must not be null");
        return connect(str, i, null, new SSLConfig(SSLMode.DISABLE, null, null));
    }

    public static Mono<ReactorNettyClient> connect(String str, int i, @Nullable Duration duration, SSLConfig sSLConfig) {
        Assert.requireNonNull(str, "host must not be null");
        Assert.requireNonNull(sSLConfig, "sslConfig must not be null");
        return connect(InetSocketAddress.createUnresolved(str, i), ConnectionSettings.builder().connectTimeout(duration).sslConfig(sSLConfig).build());
    }

    public static Mono<ReactorNettyClient> connect(SocketAddress socketAddress, ConnectionSettings connectionSettings) {
        Assert.requireNonNull(socketAddress, "socketAddress must not be null");
        Assert.requireNonNull(connectionSettings, "settings must not be null");
        TcpClient remoteAddress = TcpClient.create(connectionSettings.getConnectionProvider()).remoteAddress(() -> {
            return socketAddress;
        });
        if (connectionSettings.hasLoopResources()) {
            remoteAddress = remoteAddress.runOn(connectionSettings.getRequiredLoopResources());
        }
        if (socketAddress instanceof InetSocketAddress) {
            remoteAddress = remoteAddress.resolver((AddressResolverGroup<?>) BalancedResolverGroup.INSTANCE).option((ChannelOption<ChannelOption<Boolean>>) ChannelOption.SO_KEEPALIVE, (ChannelOption<Boolean>) Boolean.valueOf(connectionSettings.isTcpKeepAlive())).option((ChannelOption<ChannelOption<Boolean>>) ChannelOption.TCP_NODELAY, (ChannelOption<Boolean>) Boolean.valueOf(connectionSettings.isTcpNoDelay()));
        }
        if (connectionSettings.hasConnectionTimeout()) {
            remoteAddress = remoteAddress.option((ChannelOption<ChannelOption<Integer>>) ChannelOption.CONNECT_TIMEOUT_MILLIS, (ChannelOption<Integer>) Integer.valueOf(connectionSettings.getConnectTimeoutMs()));
        }
        return remoteAddress.connect().flatMap(connection -> {
            ChannelPipeline pipeline = connection.channel().pipeline();
            if (InternalLoggerFactory.getInstance((Class<?>) ReactorNettyClient.class).isTraceEnabled()) {
                pipeline.addFirst(LoggingHandler.class.getSimpleName(), new LoggingHandler((Class<?>) ReactorNettyClient.class, LogLevel.TRACE));
            }
            return registerSslHandler(connectionSettings.getSslConfig(), connection).thenReturn(new ReactorNettyClient(connection, connectionSettings));
        });
    }

    private static Mono<? extends Void> registerSslHandler(SSLConfig sSLConfig, Connection connection) {
        try {
            return sSLConfig.getSslMode().startSsl() ? Mono.defer(() -> {
                AbstractPostgresSSLHandlerAdapter sSLTunnelHandlerAdapter = sSLConfig.getSslMode() == SSLMode.TUNNEL ? new SSLTunnelHandlerAdapter(connection.outbound().alloc(), sSLConfig) : new SSLSessionHandlerAdapter(connection.outbound().alloc(), sSLConfig);
                connection.addHandlerFirst(sSLTunnelHandlerAdapter);
                return sSLTunnelHandlerAdapter.getHandshake();
            }).subscribeOn(Schedulers.boundedElastic()) : Mono.empty();
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
        return this.notificationProcessor.asFlux().subscribe(consumer);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public void addNotificationListener(Subscriber<NotificationResponse> subscriber) {
        this.notificationProcessor.asFlux().subscribe(subscriber);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public ByteBufAllocator getByteBufAllocator() {
        return this.byteBufAllocator;
    }

    @Override // io.r2dbc.postgresql.client.Client
    public ConnectionContext getContext() {
        return this.context;
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Optional<Integer> getProcessId() {
        return Optional.ofNullable(this.processId);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Optional<Integer> getSecretKey() {
        return Optional.ofNullable(this.secretKey);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Optional<TimeZone> getTimeZone() {
        return Optional.ofNullable(this.timeZone);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public TransactionStatus getTransactionStatus() {
        return this.transactionStatus;
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Version getVersion() {
        return this.version;
    }

    @Override // io.r2dbc.postgresql.client.Client
    public boolean isConnected() {
        if (this.isClosed.get()) {
            return false;
        }
        return this.connection.channel().isOpen();
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Mono<Void> cancelRequest() {
        return Mono.defer(() -> {
            int intValue = getProcessId().orElseThrow(() -> {
                return new IllegalStateException("Connection does not yet have a processId");
            }).intValue();
            int intValue2 = getSecretKey().orElseThrow(() -> {
                return new IllegalStateException("Connection does not yet have a secretKey");
            }).intValue();
            return connect(this.connection.channel().remoteAddress(), this.settings).flatMap(reactorNettyClient -> {
                Mono<Void> exchange = CancelRequestMessageFlow.exchange(reactorNettyClient, intValue, intValue2);
                reactorNettyClient.getClass();
                Mono<V> then = exchange.then(Mono.defer(reactorNettyClient::closeConnection));
                Class<PostgresConnectionClosedException> cls = PostgresConnectionClosedException.class;
                PostgresConnectionClosedException.class.getClass();
                return then.onErrorResume((v1) -> {
                    return r1.isInstance(v1);
                }, (Function<? super Throwable, ? extends Mono<? extends V>>) th -> {
                    return Mono.empty();
                });
            });
        });
    }

    private static String toString(List<Field> list) {
        StringJoiner stringJoiner = new StringJoiner(", ");
        for (Field field : list) {
            stringJoiner.add(field.getType().name() + "=" + field.getValue());
        }
        return stringJoiner.toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleClose() {
        if (this.isClosed.compareAndSet(false, true)) {
            drainError(UNEXPECTED);
        } else {
            drainError(EXPECTED);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleConnectionError(Throwable th) {
        if (AbortedException.isConnectionReset(th) && !isConnected()) {
            drainError(() -> {
                return this.messageSubscriber.createClientClosedException(th);
            });
        }
        drainError(() -> {
            return new PostgresConnectionException(th);
        });
    }

    private void drainError(Supplier<? extends Throwable> supplier) {
        this.messageSubscriber.close(supplier);
        this.notificationProcessor.tryEmitError(supplier.get());
    }

    static {
        Schedulers.boundedElastic();
    }
}
