package com.alibaba.rsocket.client;

import com.alibaba.rsocket.ServiceLocator;
import com.alibaba.rsocket.cloudevents.CloudEventImpl;
import com.alibaba.rsocket.events.CloudEventsProcessor;
import com.alibaba.rsocket.events.ServicesExposedEvent;
import com.alibaba.rsocket.events.ServicesHiddenEvent;
import com.alibaba.rsocket.health.RSocketServiceHealth;
import com.alibaba.rsocket.invocation.RSocketRemoteServiceBuilder;
import com.alibaba.rsocket.metadata.RSocketMimeType;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import com.alibaba.rsocket.rpc.LocalReactiveServiceCaller;
import com.alibaba.rsocket.upstream.UpstreamCluster;
import com.alibaba.rsocket.upstream.UpstreamClusterChangedEventConsumer;
import com.alibaba.rsocket.upstream.UpstreamManager;
import com.alibaba.rsocket.upstream.UpstreamManagerImpl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-core-1.1.6.jar:com/alibaba/rsocket/client/RSocketBrokerClient.class */
public class RSocketBrokerClient {
    private static Logger log = LoggerFactory.getLogger((Class<?>) RSocketBrokerClient.class);
    private List<String> brokers;
    private String appName;
    private RSocketMimeType dataMimeType;
    private UpstreamManager upstreamManager;
    private LocalReactiveServiceCaller serviceCaller;
    private Sinks.Many<CloudEventImpl> eventProcessor = Sinks.many().multicast().onBackpressureBuffer();
    private SimpleRSocketRequesterSupport rsocketRequesterSupport;
    private CloudEventsProcessor cloudEventsProcessor;

    public RSocketBrokerClient(String str, List<String> list, String str2, Map<String, String> map, RSocketMimeType rSocketMimeType, char[] cArr, LocalReactiveServiceCaller localReactiveServiceCaller) {
        this.appName = str;
        this.brokers = list;
        this.dataMimeType = rSocketMimeType;
        this.serviceCaller = localReactiveServiceCaller;
        this.serviceCaller.addProvider("", RSocketServiceHealth.class.getCanonicalName(), "", RSocketServiceHealth.class, str3 -> {
            return Mono.just(1);
        });
        this.rsocketRequesterSupport = new SimpleRSocketRequesterSupport(str, cArr, this.brokers, this.serviceCaller, this.eventProcessor);
        this.rsocketRequesterSupport.setTopology(str2);
        this.rsocketRequesterSupport.setMetadata(map);
        this.cloudEventsProcessor = new CloudEventsProcessor(this.eventProcessor, new ArrayList());
        initUpstreamManager();
    }

    private void initUpstreamManager() {
        this.upstreamManager = new UpstreamManagerImpl(this.rsocketRequesterSupport);
        this.upstreamManager.add(new UpstreamCluster(null, "*", null, this.brokers));
        try {
            this.upstreamManager.init();
            this.cloudEventsProcessor.addConsumer(new UpstreamClusterChangedEventConsumer(this.upstreamManager));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public RSocketBrokerClient addService(String str, Class<?> cls, Object obj) {
        this.serviceCaller.addProvider("", str, "", cls, obj);
        return this;
    }

    public void publishServices() {
        CloudEventImpl<ServicesExposedEvent> cloudEventImpl = this.rsocketRequesterSupport.servicesExposedEvent().get();
        if (cloudEventImpl != null) {
            this.upstreamManager.findBroker().getLoadBalancedRSocket().fireCloudEventToUpstreamAll(cloudEventImpl).doOnSuccess(r8 -> {
                log.info(RsocketErrorCode.message("RST-301201", (String) this.rsocketRequesterSupport.exposedServices().get().stream().map((v0) -> {
                    return v0.getGsv();
                }).collect(Collectors.joining(StringArrayPropertyEditor.DEFAULT_SEPARATOR)), this.brokers));
            }).subscribe();
        }
    }

    public void removeService(String str, Class<?> cls) {
        this.upstreamManager.findBroker().getLoadBalancedRSocket().fireCloudEventToUpstreamAll(ServicesHiddenEvent.convertServicesToCloudEvent(Collections.singletonList(new ServiceLocator("", str, "")))).doOnSuccess(r9 -> {
            this.serviceCaller.removeProvider("", str, "", cls);
        }).subscribe();
    }

    public void dispose() {
        this.upstreamManager.close();
    }

    public <T> T buildService(Class<T> cls) {
        return RSocketRemoteServiceBuilder.client(cls).service(cls.getCanonicalName()).encodingType(this.dataMimeType).acceptEncodingType(this.dataMimeType).upstreamManager(this.upstreamManager).build();
    }

    public <T> T buildService(Class<T> cls, String str) {
        return RSocketRemoteServiceBuilder.client(cls).service(str).encodingType(this.dataMimeType).acceptEncodingType(this.dataMimeType).upstreamManager(this.upstreamManager).build();
    }
}
