package com.alibaba.rsocket.rpc;

import com.alibaba.rsocket.RSocketAppContext;
import com.alibaba.rsocket.cloudevents.CloudEventRSocket;
import com.alibaba.rsocket.cloudevents.EventReply;
import com.alibaba.rsocket.listen.RSocketResponderSupport;
import com.alibaba.rsocket.metadata.AppMetadata;
import com.alibaba.rsocket.metadata.BinaryRoutingMetadata;
import com.alibaba.rsocket.metadata.GSVRoutingMetadata;
import com.alibaba.rsocket.metadata.MessageMimeTypeMetadata;
import com.alibaba.rsocket.metadata.RSocketCompositeMetadata;
import com.alibaba.rsocket.metadata.RSocketMimeType;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import io.cloudevents.json.Json;
import io.cloudevents.v1.CloudEventImpl;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ResponderRSocket;
import io.rsocket.exceptions.InvalidException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.extra.processor.TopicProcessor;

/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-core-1.0.0.M1.jar:com/alibaba/rsocket/rpc/RSocketResponderHandler.class */
public class RSocketResponderHandler extends RSocketResponderSupport implements ResponderRSocket, CloudEventRSocket {
    protected RSocket requester;

    @Nullable
    protected MessageMimeTypeMetadata defaultMessageMimeType;
    private Mono<Void> comboOnClose;
    protected TopicProcessor<CloudEventImpl> eventProcessor;

    public RSocketResponderHandler(LocalReactiveServiceCaller localReactiveServiceCaller, TopicProcessor<CloudEventImpl> topicProcessor, RSocket rSocket, ConnectionSetupPayload connectionSetupPayload) {
        RSocketMimeType valueOfType;
        this.defaultMessageMimeType = null;
        this.localServiceCaller = localReactiveServiceCaller;
        this.eventProcessor = topicProcessor;
        this.requester = rSocket;
        this.comboOnClose = Mono.first(super.onClose(), rSocket.onClose());
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(connectionSetupPayload.metadata());
        if (!from.contains(RSocketMimeType.Application) || AppMetadata.from(from.getMetadata(RSocketMimeType.Application)).getUuid().equals(RSocketAppContext.ID) || (valueOfType = RSocketMimeType.valueOfType(connectionSetupPayload.dataMimeType())) == null) {
            return;
        }
        this.defaultMessageMimeType = new MessageMimeTypeMetadata(valueOfType);
    }

    @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(payload.metadata());
        GSVRoutingMetadata gsvRoutingMetadata = getGsvRoutingMetadata(from);
        if (gsvRoutingMetadata == null) {
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(new InvalidException(RsocketErrorCode.message("RST-600404", new Object[0])));
        }
        MessageMimeTypeMetadata dataEncodingMetadata = getDataEncodingMetadata(from);
        if (dataEncodingMetadata != null) {
            return localRequestResponse(gsvRoutingMetadata, dataEncodingMetadata, from.getAcceptMimeTypesMetadata(), payload);
        }
        ReferenceCountUtil.safeRelease(payload);
        return Mono.error(new InvalidException(RsocketErrorCode.message("RST-700404", new Object[0])));
    }

    @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(payload.metadata());
        GSVRoutingMetadata gsvRoutingMetadata = getGsvRoutingMetadata(from);
        if (gsvRoutingMetadata == null) {
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(new InvalidException(RsocketErrorCode.message("RST-600404", new Object[0])));
        }
        MessageMimeTypeMetadata dataEncodingMetadata = getDataEncodingMetadata(from);
        if (dataEncodingMetadata != null) {
            return localFireAndForget(gsvRoutingMetadata, dataEncodingMetadata, payload);
        }
        ReferenceCountUtil.safeRelease(payload);
        return Mono.error(new InvalidException(RsocketErrorCode.message("RST-700404", new Object[0])));
    }

    @Override // com.alibaba.rsocket.cloudevents.CloudEventRSocket
    public Mono<Void> fireCloudEvent(CloudEventImpl<?> cloudEventImpl) {
        return Mono.fromRunnable(() -> {
            this.eventProcessor.onNext(cloudEventImpl);
        });
    }

    @Override // com.alibaba.rsocket.cloudevents.CloudEventRSocket
    public Mono<Void> fireEventReply(URI uri, EventReply eventReply) {
        return this.requester.fireAndForget(constructEventReplyPayload(uri, eventReply));
    }

    @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(payload.metadata());
        GSVRoutingMetadata gsvRoutingMetadata = getGsvRoutingMetadata(from);
        if (gsvRoutingMetadata == null) {
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(new InvalidException(RsocketErrorCode.message("RST-600404", new Object[0])));
        }
        MessageMimeTypeMetadata dataEncodingMetadata = getDataEncodingMetadata(from);
        if (dataEncodingMetadata != null) {
            return localRequestStream(gsvRoutingMetadata, dataEncodingMetadata, from.getAcceptMimeTypesMetadata(), payload);
        }
        ReferenceCountUtil.safeRelease(payload);
        return Flux.error(new InvalidException(RsocketErrorCode.message("RST-700404", new Object[0])));
    }

    @Override // io.rsocket.ResponderRSocket
    public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
        RSocketCompositeMetadata from = RSocketCompositeMetadata.from(payload.metadata());
        GSVRoutingMetadata gsvRoutingMetadata = getGsvRoutingMetadata(from);
        if (gsvRoutingMetadata == null) {
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(new InvalidException(RsocketErrorCode.message("RST-600404", new Object[0])));
        }
        MessageMimeTypeMetadata dataEncodingMetadata = getDataEncodingMetadata(from);
        if (dataEncodingMetadata != null) {
            return localRequestChannel(gsvRoutingMetadata, dataEncodingMetadata, from.getAcceptMimeTypesMetadata(), payload, Flux.from(publisher).skip(1L));
        }
        ReferenceCountUtil.safeRelease(payload);
        return Flux.error(new InvalidException(RsocketErrorCode.message("RST-700404", new Object[0])));
    }

    @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        try {
        } catch (Exception e) {
            this.log.error(RsocketErrorCode.message(RsocketErrorCode.message("RST-610500", e.getMessage()), new Object[0]), (Throwable) e);
        } finally {
            ReferenceCountUtil.safeRelease(payload);
        }
        return payload.metadata().readableBytes() > 0 ? fireCloudEvent((CloudEventImpl) Json.decodeValue(payload.getMetadataUtf8(), CLOUD_EVENT_TYPE_REFERENCE)) : Mono.empty();
    }

    public Mono<Void> fireCloudEventToPeer(CloudEventImpl<?> cloudEventImpl) {
        try {
            return this.requester.metadataPush(cloudEventToMetadataPushPayload(cloudEventImpl));
        } catch (Exception e) {
            return Mono.error(e);
        }
    }

    @Override // io.rsocket.AbstractRSocket, io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.comboOnClose;
    }

    @Nullable
    private MessageMimeTypeMetadata getDataEncodingMetadata(RSocketCompositeMetadata rSocketCompositeMetadata) {
        MessageMimeTypeMetadata dataEncodingMetadata = rSocketCompositeMetadata.getDataEncodingMetadata();
        return dataEncodingMetadata == null ? this.defaultMessageMimeType : dataEncodingMetadata;
    }

    @Nullable
    private GSVRoutingMetadata getGsvRoutingMetadata(RSocketCompositeMetadata rSocketCompositeMetadata) {
        BinaryRoutingMetadata binaryRoutingMetadata = rSocketCompositeMetadata.getBinaryRoutingMetadata();
        GSVRoutingMetadata routingMetaData = rSocketCompositeMetadata.getRoutingMetaData();
        if (binaryRoutingMetadata != null && routingMetaData == null) {
            routingMetaData = GSVRoutingMetadata.from(new String(binaryRoutingMetadata.getRoutingText(), StandardCharsets.UTF_8));
        }
        return routingMetaData;
    }
}
