package io.rsocket.transport.local;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.internal.UnboundedProcessor;
import java.net.SocketAddress;
import java.util.Objects;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/rsocket-transport-local-1.1.3.jar:io/rsocket/transport/local/LocalDuplexConnection.class */
public final class LocalDuplexConnection implements DuplexConnection {
    private final LocalSocketAddress address;
    private final ByteBufAllocator allocator;
    private final Flux<ByteBuf> in;
    private final Mono<Void> onClose;
    private final UnboundedProcessor out;

    /* loaded from: input_file:BOOT-INF/lib/rsocket-transport-local-1.1.3.jar:io/rsocket/transport/local/LocalDuplexConnection$ByteBufReleaserOperator.class */
    static class ByteBufReleaserOperator implements CoreSubscriber<ByteBuf>, Subscription, Fuseable.QueueSubscription<ByteBuf> {
        final CoreSubscriber<? super ByteBuf> actual;
        final LocalDuplexConnection parent;
        Subscription s;

        public ByteBufReleaserOperator(CoreSubscriber<? super ByteBuf> coreSubscriber, LocalDuplexConnection localDuplexConnection) {
            this.actual = coreSubscriber;
            this.parent = localDuplexConnection;
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (Operators.validate(this.s, subscription)) {
                this.s = subscription;
                this.actual.onSubscribe(this);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(ByteBuf byteBuf) {
            try {
                this.actual.onNext(byteBuf);
            } finally {
                byteBuf.release();
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.parent.out.onError(th);
            this.actual.onError(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.parent.out.onComplete();
            this.actual.onComplete();
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            this.s.request(j);
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            this.s.cancel();
            this.parent.out.onComplete();
        }

        @Override // reactor.core.Fuseable.QueueSubscription
        public int requestFusion(int i) {
            return 0;
        }

        @Override // java.util.Queue
        public ByteBuf poll() {
            throw new UnsupportedOperationException(Fuseable.QueueSubscription.NOT_SUPPORTED_MESSAGE);
        }

        @Override // java.util.Collection
        public int size() {
            throw new UnsupportedOperationException(Fuseable.QueueSubscription.NOT_SUPPORTED_MESSAGE);
        }

        @Override // java.util.Collection
        public boolean isEmpty() {
            throw new UnsupportedOperationException(Fuseable.QueueSubscription.NOT_SUPPORTED_MESSAGE);
        }

        @Override // java.util.Collection
        public void clear() {
            throw new UnsupportedOperationException(Fuseable.QueueSubscription.NOT_SUPPORTED_MESSAGE);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalDuplexConnection(String str, ByteBufAllocator byteBufAllocator, Flux<ByteBuf> flux, UnboundedProcessor unboundedProcessor, Mono<Void> mono) {
        this.address = new LocalSocketAddress(str);
        this.allocator = (ByteBufAllocator) Objects.requireNonNull(byteBufAllocator, "allocator must not be null");
        this.in = (Flux) Objects.requireNonNull(flux, "in must not be null");
        this.out = (UnboundedProcessor) Objects.requireNonNull(unboundedProcessor, "out must not be null");
        this.onClose = (Mono) Objects.requireNonNull(mono, "onClose must not be null");
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.out.onComplete();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.out.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.onClose;
    }

    @Override // io.rsocket.DuplexConnection
    public Flux<ByteBuf> receive() {
        return this.in.transform(Operators.lift((scannable, coreSubscriber) -> {
            return new ByteBufReleaserOperator(coreSubscriber, this);
        }));
    }

    @Override // io.rsocket.DuplexConnection
    public void sendFrame(int i, ByteBuf byteBuf) {
        if (i == 0) {
            this.out.onNextPrioritized(byteBuf);
        } else {
            this.out.onNext(byteBuf);
        }
    }

    @Override // io.rsocket.DuplexConnection
    public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
        this.out.onNext(ErrorFrameCodec.encode(this.allocator, 0, rSocketErrorException));
        dispose();
    }

    @Override // io.rsocket.DuplexConnection
    public ByteBufAllocator alloc() {
        return this.allocator;
    }

    @Override // io.rsocket.DuplexConnection
    public SocketAddress remoteAddress() {
        return this.address;
    }
}
