package com.alibaba.rsocket.rpc;

import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import com.alibaba.rsocket.RSocketAppContext;
import com.alibaba.rsocket.cloudevents.CloudEventImpl;
import com.alibaba.rsocket.cloudevents.CloudEventRSocket;
import com.alibaba.rsocket.cloudevents.EventReply;
import com.alibaba.rsocket.listen.RSocketResponderSupport;
import com.alibaba.rsocket.metadata.AppMetadata;
import com.alibaba.rsocket.metadata.BinaryRoutingMetadata;
import com.alibaba.rsocket.metadata.GSVRoutingMetadata;
import com.alibaba.rsocket.metadata.MessageMimeTypeMetadata;
import com.alibaba.rsocket.metadata.RSocketCompositeMetadata;
import com.alibaba.rsocket.metadata.RSocketMimeType;
import com.alibaba.rsocket.metadata.TracingMetadata;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import com.fasterxml.jackson.databind.JsonNode;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.exceptions.InvalidException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;

/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-core-1.1.6.jar:com/alibaba/rsocket/rpc/RSocketResponderHandler.class */
public class RSocketResponderHandler extends RSocketResponderSupport implements CloudEventRSocket {
    public static final AtomicInteger CONNECTION_COUNTER = new AtomicInteger();
    protected RSocket requester;

    @Nullable
    protected MessageMimeTypeMetadata defaultMessageMimeType;
    private Mono<Void> comboOnClose;
    protected Sinks.Many<CloudEventImpl> eventProcessor;
    private boolean braveTracing;
    private Tracer tracer;

    public RSocketResponderHandler(LocalReactiveServiceCaller localReactiveServiceCaller, Sinks.Many<CloudEventImpl> many, RSocket rSocket, ConnectionSetupPayload connectionSetupPayload) {
        RSocketMimeType valueOfType;
        this.defaultMessageMimeType = null;
        this.braveTracing = true;
        this.localServiceCaller = localReactiveServiceCaller;
        this.eventProcessor = many;
        this.requester = rSocket;
        this.comboOnClose = Mono.first(super.onClose(), rSocket.onClose());
        this.comboOnClose.subscribe(r2 -> {
            CONNECTION_COUNTER.decrementAndGet();
        });
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(connectionSetupPayload.metadata());
        if (from.contains(RSocketMimeType.Application) && !AppMetadata.from(from.getMetadata(RSocketMimeType.Application)).getUuid().equals(RSocketAppContext.ID) && (valueOfType = RSocketMimeType.valueOfType(connectionSetupPayload.dataMimeType())) != null) {
            this.defaultMessageMimeType = new MessageMimeTypeMetadata(valueOfType);
        }
        CONNECTION_COUNTER.incrementAndGet();
        try {
            Class.forName("brave.propagation.TraceContext");
        } catch (ClassNotFoundException e) {
            this.braveTracing = false;
        }
    }

    public void setTracer(Tracer tracer) {
        this.tracer = tracer;
    }

    @Override // io.rsocket.RSocket
    @NotNull
    public Mono<Payload> requestResponse(Payload payload) {
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(payload.metadata());
        GSVRoutingMetadata gsvRoutingMetadata = getGsvRoutingMetadata(from);
        if (gsvRoutingMetadata == null) {
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(new InvalidException(RsocketErrorCode.message("RST-600404", new Object[0])));
        }
        MessageMimeTypeMetadata dataEncodingMetadata = getDataEncodingMetadata(from);
        if (dataEncodingMetadata != null) {
            return injectTraceContext(localRequestResponse(gsvRoutingMetadata, dataEncodingMetadata, from.getAcceptMimeTypesMetadata(), payload), from);
        }
        ReferenceCountUtil.safeRelease(payload);
        return Mono.error(new InvalidException(RsocketErrorCode.message("RST-700404", new Object[0])));
    }

    @Override // io.rsocket.RSocket
    @NotNull
    public Mono<Void> fireAndForget(Payload payload) {
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(payload.metadata());
        GSVRoutingMetadata gsvRoutingMetadata = getGsvRoutingMetadata(from);
        if (gsvRoutingMetadata == null) {
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(new InvalidException(RsocketErrorCode.message("RST-600404", new Object[0])));
        }
        MessageMimeTypeMetadata dataEncodingMetadata = getDataEncodingMetadata(from);
        if (dataEncodingMetadata != null) {
            return injectTraceContext(localFireAndForget(gsvRoutingMetadata, dataEncodingMetadata, payload), from);
        }
        ReferenceCountUtil.safeRelease(payload);
        return Mono.error(new InvalidException(RsocketErrorCode.message("RST-700404", new Object[0])));
    }

    @Override // com.alibaba.rsocket.cloudevents.CloudEventRSocket
    public Mono<Void> fireCloudEvent(CloudEventImpl<?> cloudEventImpl) {
        cloudEventImpl.setSourcing(this.sourcing);
        return Mono.fromRunnable(() -> {
            this.eventProcessor.tryEmitNext(cloudEventImpl);
        });
    }

    @Override // com.alibaba.rsocket.cloudevents.CloudEventRSocket
    public Mono<Void> fireEventReply(URI uri, EventReply eventReply) {
        return this.requester.fireAndForget(constructEventReplyPayload(uri, eventReply));
    }

    @Override // io.rsocket.RSocket
    @NotNull
    public Flux<Payload> requestStream(Payload payload) {
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(payload.metadata());
        GSVRoutingMetadata gsvRoutingMetadata = getGsvRoutingMetadata(from);
        if (gsvRoutingMetadata == null) {
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(new InvalidException(RsocketErrorCode.message("RST-600404", new Object[0])));
        }
        MessageMimeTypeMetadata dataEncodingMetadata = getDataEncodingMetadata(from);
        if (dataEncodingMetadata != null) {
            return injectTraceContext(localRequestStream(gsvRoutingMetadata, dataEncodingMetadata, from.getAcceptMimeTypesMetadata(), payload), from);
        }
        ReferenceCountUtil.safeRelease(payload);
        return Flux.error(new InvalidException(RsocketErrorCode.message("RST-700404", new Object[0])));
    }

    public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(payload.metadata());
        GSVRoutingMetadata gsvRoutingMetadata = getGsvRoutingMetadata(from);
        if (gsvRoutingMetadata == null) {
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(new InvalidException(RsocketErrorCode.message("RST-600404", new Object[0])));
        }
        MessageMimeTypeMetadata dataEncodingMetadata = getDataEncodingMetadata(from);
        if (dataEncodingMetadata != null) {
            return publisher instanceof Flux ? localRequestChannel(gsvRoutingMetadata, dataEncodingMetadata, from.getAcceptMimeTypesMetadata(), payload, ((Flux) publisher).skip(1L)) : localRequestChannel(gsvRoutingMetadata, dataEncodingMetadata, from.getAcceptMimeTypesMetadata(), payload, Flux.from(publisher).skip(1L));
        }
        ReferenceCountUtil.safeRelease(payload);
        return Flux.error(new InvalidException(RsocketErrorCode.message("RST-700404", new Object[0])));
    }

    @Override // io.rsocket.RSocket
    @NotNull
    public final Flux<Payload> requestChannel(@NotNull Publisher<Payload> publisher) {
        return publisher instanceof Flux ? ((Flux) publisher).switchOnFirst((signal, flux) -> {
            return requestChannel((Payload) signal.get(), flux);
        }) : Flux.error(new InvalidException(RsocketErrorCode.message("RST-201400", new Object[0])));
    }

    @Override // io.rsocket.RSocket
    @NotNull
    public Mono<Void> metadataPush(@NotNull Payload payload) {
        CloudEventImpl<JsonNode> extractCloudEventsFromMetadataPush;
        try {
            try {
            } catch (Exception e) {
                this.log.error(RsocketErrorCode.message(RsocketErrorCode.message("RST-610500", e.getMessage()), new Object[0]), (Throwable) e);
                ReferenceCountUtil.safeRelease(payload);
            }
            if (payload.metadata().readableBytes() <= 0 || (extractCloudEventsFromMetadataPush = extractCloudEventsFromMetadataPush(payload)) == null) {
                ReferenceCountUtil.safeRelease(payload);
                return Mono.empty();
            }
            Mono<Void> fireCloudEvent = fireCloudEvent(extractCloudEventsFromMetadataPush);
            ReferenceCountUtil.safeRelease(payload);
            return fireCloudEvent;
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(payload);
            throw th;
        }
    }

    public Mono<Void> fireCloudEventToPeer(CloudEventImpl<?> cloudEventImpl) {
        try {
            return this.requester.metadataPush(cloudEventToMetadataPushPayload(cloudEventImpl));
        } catch (Exception e) {
            return Mono.error(e);
        }
    }

    @Override // com.alibaba.rsocket.AbstractRSocket, io.rsocket.RSocket, io.rsocket.Closeable
    @NotNull
    public Mono<Void> onClose() {
        return this.comboOnClose;
    }

    @Nullable
    private MessageMimeTypeMetadata getDataEncodingMetadata(RSocketCompositeMetadata rSocketCompositeMetadata) {
        MessageMimeTypeMetadata dataEncodingMetadata = rSocketCompositeMetadata.getDataEncodingMetadata();
        return dataEncodingMetadata == null ? this.defaultMessageMimeType : dataEncodingMetadata;
    }

    @Nullable
    private GSVRoutingMetadata getGsvRoutingMetadata(RSocketCompositeMetadata rSocketCompositeMetadata) {
        BinaryRoutingMetadata binaryRoutingMetadata = rSocketCompositeMetadata.getBinaryRoutingMetadata();
        GSVRoutingMetadata routingMetaData = rSocketCompositeMetadata.getRoutingMetaData();
        if (binaryRoutingMetadata != null && routingMetaData == null) {
            routingMetaData = GSVRoutingMetadata.from(new String(binaryRoutingMetadata.getRoutingText(), StandardCharsets.UTF_8));
        }
        return routingMetaData;
    }

    @NotNull
    private TraceContext constructTraceContext(@NotNull TracingMetadata tracingMetadata) {
        return TraceContext.newBuilder().parentId(tracingMetadata.parentId()).spanId(tracingMetadata.spanId()).traceIdHigh(tracingMetadata.traceIdHigh()).traceId(tracingMetadata.traceId()).build();
    }

    <T> Mono<T> injectTraceContext(Mono<T> mono, RSocketCompositeMetadata rSocketCompositeMetadata) {
        TracingMetadata tracingMetadata;
        if (!this.braveTracing || this.tracer == null || (tracingMetadata = rSocketCompositeMetadata.getTracingMetadata()) == null) {
            return mono;
        }
        TraceContext constructTraceContext = constructTraceContext(tracingMetadata);
        Span newChild = this.tracer.newChild(constructTraceContext);
        Objects.requireNonNull(newChild);
        return mono.doOnError(newChild::error).doOnSuccess(obj -> {
            newChild.finish();
        }).contextWrite(Context.of(TraceContext.class, constructTraceContext));
    }

    Flux<Payload> injectTraceContext(Flux<Payload> flux, RSocketCompositeMetadata rSocketCompositeMetadata) {
        TracingMetadata tracingMetadata;
        if (!this.braveTracing || this.tracer == null || (tracingMetadata = rSocketCompositeMetadata.getTracingMetadata()) == null) {
            return flux;
        }
        TraceContext constructTraceContext = constructTraceContext(tracingMetadata);
        Span newChild = this.tracer.newChild(constructTraceContext);
        Objects.requireNonNull(newChild);
        Flux<Payload> doOnError = flux.doOnError(newChild::error);
        Objects.requireNonNull(newChild);
        return doOnError.doOnComplete(newChild::finish).contextWrite(Context.of(TraceContext.class, constructTraceContext));
    }
}
