package com.alibaba.rsocket.invocation;

import com.alibaba.rsocket.MutableContext;
import com.alibaba.rsocket.ServiceLocator;
import com.alibaba.rsocket.encoding.RSocketEncodingFacade;
import com.alibaba.rsocket.metadata.MessageMimeTypeMetadata;
import com.alibaba.rsocket.metadata.RSocketCompositeMetadata;
import com.alibaba.rsocket.metadata.RSocketMimeType;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import com.alibaba.rsocket.upstream.UpstreamManager;
import io.micrometer.core.instrument.Metrics;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import io.rsocket.frame.FrameType;
import io.rsocket.util.ByteBufPayload;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import kotlin.coroutines.Continuation;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import net.bytebuddy.implementation.bind.annotation.This;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-core-1.1.6.jar:com/alibaba/rsocket/invocation/RSocketRequesterRpcProxy.class */
public class RSocketRequesterRpcProxy implements InvocationHandler {
    private static Logger log = LoggerFactory.getLogger((Class<?>) RSocketRequesterRpcProxy.class);
    protected UpstreamManager upstreamManager;
    protected Class<?> serviceInterface;
    protected String group;
    protected String service;
    protected String version;
    protected String serviceId;
    protected String endpoint;
    protected boolean sticky;
    private URI sourceUri;
    protected RSocketMimeType encodingType;
    protected RSocketMimeType[] acceptEncodingTypes;
    protected Duration timeout;
    protected RSocketEncodingFacade encodingFacade = RSocketEncodingFacade.getInstance();
    protected Map<Method, ReactiveMethodMetadata> methodMetadataMap = new ConcurrentHashMap();
    private boolean jdkProxy;

    public RSocketRequesterRpcProxy(UpstreamManager upstreamManager, String str, Class<?> cls, @Nullable String str2, String str3, RSocketMimeType rSocketMimeType, @Nullable RSocketMimeType rSocketMimeType2, Duration duration, @Nullable String str4, boolean z, URI uri, boolean z2) {
        this.upstreamManager = upstreamManager;
        this.serviceInterface = cls;
        this.service = cls.getCanonicalName();
        if (str2 != null && !str2.isEmpty()) {
            this.service = str2;
        }
        this.group = str;
        this.version = str3;
        this.serviceId = ServiceLocator.serviceId(this.group, this.service, this.version);
        this.endpoint = str4;
        this.sticky = z;
        this.sourceUri = uri;
        this.encodingType = rSocketMimeType;
        if (rSocketMimeType2 == null) {
            this.acceptEncodingTypes = defaultAcceptEncodingTypes();
        } else {
            this.acceptEncodingTypes = new RSocketMimeType[]{rSocketMimeType2};
        }
        this.timeout = duration;
        this.jdkProxy = z2;
    }

    @Override // java.lang.reflect.InvocationHandler
    @RuntimeType
    public Object invoke(@This Object obj, @Origin Method method, @AllArguments Object[] objArr) throws Throwable {
        Payload create;
        Flux flux;
        if (this.jdkProxy && method.isDefault()) {
            return DefaultMethodHandler.getMethodHandle(method, this.serviceInterface).bindTo(obj).invokeWithArguments(objArr);
        }
        if (method.getDeclaringClass().equals(Object.class)) {
            return method.invoke(this, new Object[0]);
        }
        MutableContext mutableContext = new MutableContext();
        if (!this.methodMetadataMap.containsKey(method)) {
            this.methodMetadataMap.put(method, new ReactiveMethodMetadata(this.group, this.service, this.version, method, this.encodingType, this.acceptEncodingTypes, this.endpoint, this.sticky, this.sourceUri));
        }
        ReactiveMethodMetadata reactiveMethodMetadata = this.methodMetadataMap.get(method);
        mutableContext.put(ReactiveMethodMetadata.class, reactiveMethodMetadata);
        Object[] objArr2 = objArr;
        if (reactiveMethodMetadata.isKotlinSuspend()) {
            objArr2 = Arrays.copyOfRange(objArr2, 0, objArr2.length - 1);
            mutableContext.put(Continuation.class, objArr[objArr.length - 1]);
        }
        if (reactiveMethodMetadata.getRsocketFrameType() == FrameType.REQUEST_CHANNEL) {
            metrics(reactiveMethodMetadata);
            if (objArr2.length == 1) {
                create = ByteBufPayload.create(Unpooled.EMPTY_BUFFER, reactiveMethodMetadata.getCompositeMetadataByteBuf().retainedDuplicate());
                flux = reactiveMethodMetadata.getReactiveAdapter().toFlux(objArr2[0]);
            } else {
                create = ByteBufPayload.create(this.encodingFacade.encodingResult(objArr2[0], reactiveMethodMetadata.getParamEncoding()), reactiveMethodMetadata.getCompositeMetadataByteBuf().retainedDuplicate());
                flux = reactiveMethodMetadata.getReactiveAdapter().toFlux(objArr2[1]);
            }
            Flux<V> concatMap = this.upstreamManager.getRSocket(this.serviceId).requestChannel(flux.startWith(create).map(obj2 -> {
                return obj2 instanceof Payload ? (Payload) obj2 : ByteBufPayload.create(this.encodingFacade.encodingResult(obj2, this.encodingType), reactiveMethodMetadata.getCompositeMetadataByteBuf().retainedDuplicate());
            })).concatMap(payload -> {
                try {
                    return Mono.justOrEmpty(this.encodingFacade.decodeResult(extractPayloadDataMimeType(RSocketCompositeMetadata.from(payload.metadata()), this.encodingType), payload.data(), reactiveMethodMetadata.getInferredClassForReturn()));
                } catch (Exception e) {
                    return Flux.error(e);
                }
            });
            Objects.requireNonNull(mutableContext);
            Flux<?> contextWrite = concatMap.contextWrite(mutableContext::putAll);
            return reactiveMethodMetadata.isMonoChannel() ? contextWrite.last() : reactiveMethodMetadata.getReactiveAdapter().fromPublisher(contextWrite, method.getReturnType());
        }
        ByteBuf encodingParams = this.encodingFacade.encodingParams(objArr2, reactiveMethodMetadata.getParamEncoding());
        Class<?> returnType = method.getReturnType();
        if (reactiveMethodMetadata.getRsocketFrameType() == FrameType.REQUEST_RESPONSE) {
            metrics(reactiveMethodMetadata);
            return reactiveMethodMetadata.getReactiveAdapter().fromPublisher(remoteRequestResponse(reactiveMethodMetadata, reactiveMethodMetadata.getCompositeMetadataByteBuf().retainedDuplicate(), encodingParams).handle((payload2, synchronousSink) -> {
                try {
                    Object decodeResult = this.encodingFacade.decodeResult(extractPayloadDataMimeType(RSocketCompositeMetadata.from(payload2.metadata()), this.encodingType), payload2.data(), reactiveMethodMetadata.getInferredClassForReturn());
                    if (decodeResult != null) {
                        synchronousSink.next(decodeResult);
                    }
                    synchronousSink.complete();
                } catch (Exception e) {
                    synchronousSink.error(e);
                }
            }), returnType, mutableContext);
        }
        if (reactiveMethodMetadata.getRsocketFrameType() == FrameType.REQUEST_FNF) {
            metrics(reactiveMethodMetadata);
            return remoteFireAndForget(reactiveMethodMetadata, reactiveMethodMetadata.getCompositeMetadataByteBuf().retainedDuplicate(), encodingParams);
        }
        if (reactiveMethodMetadata.getRsocketFrameType() == FrameType.REQUEST_STREAM) {
            metrics(reactiveMethodMetadata);
            return reactiveMethodMetadata.getReactiveAdapter().fromPublisher(remoteRequestStream(reactiveMethodMetadata, reactiveMethodMetadata.getCompositeMetadataByteBuf().retainedDuplicate(), encodingParams).concatMap(payload3 -> {
                try {
                    return Mono.justOrEmpty(this.encodingFacade.decodeResult(extractPayloadDataMimeType(RSocketCompositeMetadata.from(payload3.metadata()), this.encodingType), payload3.data(), reactiveMethodMetadata.getInferredClassForReturn()));
                } catch (Exception e) {
                    return Mono.error(e);
                }
            }), returnType, mutableContext);
        }
        ReferenceCountUtil.safeRelease(encodingParams);
        return Mono.error(new Exception(RsocketErrorCode.message("RST-200405", reactiveMethodMetadata.getRsocketFrameType())));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Flux<Payload> remoteRequestStream(ReactiveMethodMetadata reactiveMethodMetadata, ByteBuf byteBuf, ByteBuf byteBuf2) {
        return this.upstreamManager.getRSocket(this.serviceId).requestStream(ByteBufPayload.create(byteBuf2, byteBuf));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Void> remoteFireAndForget(ReactiveMethodMetadata reactiveMethodMetadata, ByteBuf byteBuf, ByteBuf byteBuf2) {
        return this.upstreamManager.getRSocket(this.serviceId).fireAndForget(ByteBufPayload.create(byteBuf2, byteBuf));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @NotNull
    public Mono<Payload> remoteRequestResponse(ReactiveMethodMetadata reactiveMethodMetadata, ByteBuf byteBuf, ByteBuf byteBuf2) {
        return this.upstreamManager.getRSocket(this.serviceId).requestResponse(ByteBufPayload.create(byteBuf2, byteBuf)).name(reactiveMethodMetadata.getFullName()).metrics().timeout(this.timeout).doOnError(TimeoutException.class, timeoutException -> {
            timeOutMetrics(reactiveMethodMetadata);
            log.error(RsocketErrorCode.message("RST-200503", reactiveMethodMetadata.getFullName(), this.timeout));
        });
    }

    protected void metrics(ReactiveMethodMetadata reactiveMethodMetadata) {
        Metrics.counter(this.service, reactiveMethodMetadata.getMetricsTags()).increment();
    }

    protected void timeOutMetrics(ReactiveMethodMetadata reactiveMethodMetadata) {
        Metrics.counter("rsocket.timeout.error", reactiveMethodMetadata.getMetricsTags()).increment();
    }

    private RSocketMimeType extractPayloadDataMimeType(RSocketCompositeMetadata rSocketCompositeMetadata, RSocketMimeType rSocketMimeType) {
        return rSocketCompositeMetadata.contains(RSocketMimeType.MessageMimeType) ? MessageMimeTypeMetadata.from(rSocketCompositeMetadata.getMetadata(RSocketMimeType.MessageMimeType)).getRSocketMimeType() : rSocketMimeType;
    }

    public RSocketMimeType[] defaultAcceptEncodingTypes() {
        return new RSocketMimeType[]{RSocketMimeType.Hessian, RSocketMimeType.Java_Object, RSocketMimeType.Json, RSocketMimeType.Protobuf, RSocketMimeType.Avor, RSocketMimeType.CBOR, RSocketMimeType.Text, RSocketMimeType.Binary};
    }
}
