package com.alibaba.rsocket.listen;

import com.alibaba.rsocket.encoding.RSocketEncodingFacade;
import com.alibaba.rsocket.metadata.GSVRoutingMetadata;
import com.alibaba.rsocket.metadata.MessageAcceptMimeTypesMetadata;
import com.alibaba.rsocket.metadata.MessageMimeTypeMetadata;
import com.alibaba.rsocket.metadata.RSocketMimeType;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import com.alibaba.rsocket.rpc.LocalReactiveServiceCaller;
import com.alibaba.rsocket.rpc.ReactiveMethodHandler;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.exceptions.InvalidException;
import io.rsocket.util.ByteBufPayload;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-core-1.0.0.M1.jar:com/alibaba/rsocket/listen/RSocketResponderSupport.class */
public abstract class RSocketResponderSupport extends AbstractRSocket {
    protected LocalReactiveServiceCaller localServiceCaller;
    protected Logger log = LoggerFactory.getLogger(getClass());
    protected RSocketEncodingFacade encodingFacade = RSocketEncodingFacade.getInstance();

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Payload> localRequestResponse(GSVRoutingMetadata gSVRoutingMetadata, MessageMimeTypeMetadata messageMimeTypeMetadata, @Nullable MessageAcceptMimeTypesMetadata messageAcceptMimeTypesMetadata, Payload payload) {
        try {
            ReactiveMethodHandler invokeMethod = this.localServiceCaller.getInvokeMethod(gSVRoutingMetadata.getService(), gSVRoutingMetadata.getMethod());
            if (invokeMethod == null) {
                ReferenceCountUtil.safeRelease(payload);
                return Mono.error(new InvalidException(RsocketErrorCode.message("RST-201404", gSVRoutingMetadata.getService(), gSVRoutingMetadata.getMethod())));
            }
            Object invokeLocalService = invokeMethod.isAsyncReturn() ? invokeLocalService(invokeMethod, messageMimeTypeMetadata, payload) : Mono.fromCallable(() -> {
                return invokeLocalService(invokeMethod, messageMimeTypeMetadata, payload);
            });
            RSocketMimeType resultEncodingType = resultEncodingType(messageAcceptMimeTypesMetadata, messageMimeTypeMetadata.getRSocketMimeType(), invokeMethod);
            return (invokeLocalService instanceof Mono ? (Mono) invokeLocalService : invokeMethod.getReactiveAdapter().toMono(invokeLocalService)).map(obj -> {
                return this.encodingFacade.encodingResult(obj, resultEncodingType);
            }).map(byteBuf -> {
                return ByteBufPayload.create(byteBuf, this.encodingFacade.getDefaultCompositeMetadataByteBuf(resultEncodingType).retainedDuplicate());
            });
        } catch (Exception e) {
            this.log.error(RsocketErrorCode.message("RST-200500", new Object[0]), (Throwable) e);
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(new InvalidException(RsocketErrorCode.message("RST-900500", e.getMessage())));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Void> localFireAndForget(GSVRoutingMetadata gSVRoutingMetadata, MessageMimeTypeMetadata messageMimeTypeMetadata, Payload payload) {
        ReactiveMethodHandler invokeMethod = this.localServiceCaller.getInvokeMethod(gSVRoutingMetadata.getService(), gSVRoutingMetadata.getMethod());
        if (invokeMethod == null) {
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(new InvalidException(RsocketErrorCode.message("RST-201404", gSVRoutingMetadata.getService(), gSVRoutingMetadata.getMethod())));
        }
        if (!invokeMethod.isAsyncReturn()) {
            return Mono.create(monoSink -> {
                try {
                    invokeLocalService(invokeMethod, messageMimeTypeMetadata, payload);
                    monoSink.success();
                } catch (Exception e) {
                    this.log.error(RsocketErrorCode.message("RST-200500", new Object[0]), (Throwable) e);
                    monoSink.error(e);
                }
            });
        }
        try {
            return invokeMethod.getReactiveAdapter().toMono(invokeLocalService(invokeMethod, messageMimeTypeMetadata, payload));
        } catch (Exception e) {
            ReferenceCountUtil.safeRelease(payload);
            this.log.error(RsocketErrorCode.message("RST-200500", new Object[0]), (Throwable) e);
            return Mono.error(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Flux<Payload> localRequestStream(GSVRoutingMetadata gSVRoutingMetadata, MessageMimeTypeMetadata messageMimeTypeMetadata, @Nullable MessageAcceptMimeTypesMetadata messageAcceptMimeTypesMetadata, Payload payload) {
        try {
            ReactiveMethodHandler invokeMethod = this.localServiceCaller.getInvokeMethod(gSVRoutingMetadata.getService(), gSVRoutingMetadata.getMethod());
            if (invokeMethod == null) {
                ReferenceCountUtil.safeRelease(payload);
                return Flux.error(new InvalidException(RsocketErrorCode.message("RST-201404", gSVRoutingMetadata.getService(), gSVRoutingMetadata.getMethod())));
            }
            Object invokeLocalService = invokeLocalService(invokeMethod, messageMimeTypeMetadata, payload);
            Flux flux = invokeLocalService instanceof Flux ? (Flux) invokeLocalService : invokeMethod.getReactiveAdapter().toFlux(invokeLocalService);
            RSocketMimeType resultEncodingType = resultEncodingType(messageAcceptMimeTypesMetadata, messageMimeTypeMetadata.getRSocketMimeType(), invokeMethod);
            return flux.map(obj -> {
                return this.encodingFacade.encodingResult(obj, resultEncodingType);
            }).map(byteBuf -> {
                return ByteBufPayload.create(byteBuf, this.encodingFacade.getDefaultCompositeMetadataByteBuf(resultEncodingType).retainedDuplicate());
            });
        } catch (Exception e) {
            this.log.error(RsocketErrorCode.message("RST-200500", new Object[0]), (Throwable) e);
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(new InvalidException(RsocketErrorCode.message("RST-900500", e.getMessage())));
        }
    }

    @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
    public final Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.error(new InvalidException(RsocketErrorCode.message("RST-201400", new Object[0])));
    }

    public Flux<Payload> localRequestChannel(GSVRoutingMetadata gSVRoutingMetadata, MessageMimeTypeMetadata messageMimeTypeMetadata, @Nullable MessageAcceptMimeTypesMetadata messageAcceptMimeTypesMetadata, Payload payload, Flux<Payload> flux) {
        try {
            ReactiveMethodHandler invokeMethod = this.localServiceCaller.getInvokeMethod(gSVRoutingMetadata.getService(), gSVRoutingMetadata.getMethod());
            if (invokeMethod == null) {
                return Flux.error(new InvalidException(RsocketErrorCode.message("RST-201404", gSVRoutingMetadata.getService(), gSVRoutingMetadata.getMethod())));
            }
            Object invoke = invokeMethod.getParamCount() == 1 ? invokeMethod.invoke(flux.map(payload2 -> {
                return this.encodingFacade.decodeResult(messageMimeTypeMetadata.getRSocketMimeType(), payload2.data(), invokeMethod.getInferredClassForParameter(0));
            })) : invokeMethod.invoke(this.encodingFacade.decodeResult(messageMimeTypeMetadata.getRSocketMimeType(), payload.data(), invokeMethod.getParameterTypes()[0]), flux.map(payload3 -> {
                return this.encodingFacade.decodeResult(messageMimeTypeMetadata.getRSocketMimeType(), payload3.data(), invokeMethod.getInferredClassForParameter(1));
            }));
            RSocketMimeType resultEncodingType = resultEncodingType(messageAcceptMimeTypesMetadata, messageMimeTypeMetadata.getRSocketMimeType(), invokeMethod);
            return ((Flux) invoke).map(obj -> {
                return this.encodingFacade.encodingResult(obj, resultEncodingType);
            }).map(byteBuf -> {
                return ByteBufPayload.create(byteBuf, this.encodingFacade.getDefaultCompositeMetadataByteBuf(resultEncodingType).retainedDuplicate());
            });
        } catch (Exception e) {
            this.log.error(RsocketErrorCode.message("RST-200500", new Object[0]), (Throwable) e);
            return Flux.error(new InvalidException(RsocketErrorCode.message("RST-900500", e.getMessage())));
        }
    }

    @Nullable
    protected Object invokeLocalService(ReactiveMethodHandler reactiveMethodHandler, MessageMimeTypeMetadata messageMimeTypeMetadata, Payload payload) throws Exception {
        Object invoke;
        if (reactiveMethodHandler.getParamCount() > 0) {
            Object decodeParams = this.encodingFacade.decodeParams(messageMimeTypeMetadata.getRSocketMimeType(), payload.data(), reactiveMethodHandler.getParameterTypes());
            invoke = decodeParams instanceof Object[] ? reactiveMethodHandler.invoke((Object[]) decodeParams) : reactiveMethodHandler.invoke(decodeParams);
        } else {
            invoke = reactiveMethodHandler.invoke(new Object[0]);
        }
        return invoke;
    }

    private RSocketMimeType resultEncodingType(@Nullable MessageAcceptMimeTypesMetadata messageAcceptMimeTypesMetadata, RSocketMimeType rSocketMimeType, ReactiveMethodHandler reactiveMethodHandler) {
        RSocketMimeType firstAcceptType;
        return reactiveMethodHandler.isBinaryReturn() ? RSocketMimeType.Binary : (messageAcceptMimeTypesMetadata == null || (firstAcceptType = messageAcceptMimeTypesMetadata.getFirstAcceptType()) == null) ? rSocketMimeType : firstAcceptType;
    }
}
