package com.alibaba.rsocket.upstream;

import com.alibaba.rsocket.RSocketRequesterSupport;
import com.alibaba.rsocket.discovery.DiscoveryService;
import com.alibaba.rsocket.invocation.RSocketRequesterRpcProxy;
import com.alibaba.rsocket.metadata.RSocketMimeType;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import io.rsocket.RSocket;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-core-1.1.6.jar:com/alibaba/rsocket/upstream/UpstreamManagerImpl.class */
public class UpstreamManagerImpl implements UpstreamManager {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) UpstreamManagerImpl.class);
    private UpstreamCluster brokerCluster;
    private DiscoveryService brokerDiscoveryService;
    private RSocketRequesterSupport rsocketRequesterSupport;
    private List<String> p2pServices;
    private final Map<String, UpstreamCluster> clusters = new HashMap();
    private int status = 0;

    public UpstreamManagerImpl(RSocketRequesterSupport rSocketRequesterSupport) {
        this.rsocketRequesterSupport = rSocketRequesterSupport;
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public void add(UpstreamCluster upstreamCluster) {
        this.clusters.put(upstreamCluster.getServiceId(), upstreamCluster);
        if (upstreamCluster.isBroker()) {
            this.brokerCluster = upstreamCluster;
        }
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public void remove(UpstreamCluster upstreamCluster) {
        this.clusters.remove(upstreamCluster.getServiceId());
    }

    public void setP2pServices(List<String> list) {
        this.p2pServices = list;
    }

    public List<String> getP2pServices() {
        return this.p2pServices;
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public void addP2pService(String str) {
        if (this.p2pServices == null) {
            this.p2pServices = new ArrayList();
        }
        this.p2pServices.add(str);
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public Collection<UpstreamCluster> findAllClusters() {
        return this.clusters.values();
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public UpstreamCluster findClusterByServiceId(String str) {
        return this.clusters.get(str);
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public UpstreamCluster findBroker() {
        return this.brokerCluster;
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public DiscoveryService findBrokerDiscoveryService() {
        if (this.brokerCluster != null && this.brokerDiscoveryService == null) {
            this.brokerDiscoveryService = (DiscoveryService) Proxy.newProxyInstance(DiscoveryService.class.getClassLoader(), new Class[]{DiscoveryService.class}, new RSocketRequesterRpcProxy(this, "", DiscoveryService.class, DiscoveryService.class.getCanonicalName(), "", RSocketMimeType.Hessian, RSocketMimeType.Hessian, Duration.ofMillis(3000L), null, false, this.rsocketRequesterSupport.originUri(), true));
        }
        return this.brokerDiscoveryService;
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public RSocket getRSocket(String str) {
        return this.clusters.getOrDefault(str, this.brokerCluster).getLoadBalancedRSocket();
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public RSocketRequesterSupport requesterSupport() {
        return this.rsocketRequesterSupport;
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager
    public void refresh(String str, List<String> list) {
        this.clusters.get(str).setUris(list);
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager, com.alibaba.rsocket.Initializable
    public void init() throws Exception {
        if (!this.clusters.isEmpty()) {
            this.brokerCluster.setRsocketAware(this.rsocketRequesterSupport);
            this.brokerCluster.init();
            for (UpstreamCluster upstreamCluster : this.clusters.values()) {
                if (!upstreamCluster.isBroker()) {
                    upstreamCluster.setRsocketAware(this.rsocketRequesterSupport);
                    upstreamCluster.init();
                }
            }
        }
        monitorClusters();
        this.status = 1;
    }

    @Override // com.alibaba.rsocket.upstream.UpstreamManager, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        for (UpstreamCluster upstreamCluster : this.clusters.values()) {
            try {
                upstreamCluster.close();
                log.info(RsocketErrorCode.message("RST-400002", upstreamCluster.getServiceId()));
            } catch (Exception e) {
                log.error(RsocketErrorCode.message("RST-400001", new Object[0]), (Throwable) e);
            }
        }
    }

    public void monitorClusters() {
        if (this.status != 0 || this.brokerCluster == null) {
            return;
        }
        if (!this.brokerCluster.isLocal()) {
            Flux.interval(Duration.ofSeconds(120L)).flatMap(l -> {
                return findBrokerDiscoveryService().getInstances("*");
            }).map(list -> {
                return (List) list.stream().map((v0) -> {
                    return v0.getUri();
                }).collect(Collectors.toList());
            }).subscribe(list2 -> {
                this.brokerCluster.setUris(list2);
            });
        }
        if (this.p2pServices == null || this.p2pServices.isEmpty()) {
            return;
        }
        Flux.interval(Duration.ofSeconds(120L)).flatMap(l2 -> {
            return Flux.fromIterable(this.p2pServices).flatMap(str -> {
                return findBrokerDiscoveryService().getInstances(str).map(list3 -> {
                    return Tuples.of(str, (List) list3.stream().map((v0) -> {
                        return v0.getUri();
                    }).collect(Collectors.toList()));
                });
            });
        }).subscribe((Consumer<? super R>) tuple2 -> {
            UpstreamCluster upstreamCluster;
            List<String> list3 = (List) tuple2.getT2();
            if (list3.isEmpty() || (upstreamCluster = this.clusters.get(tuple2.getT1())) == null) {
                return;
            }
            upstreamCluster.setUris(list3);
        });
    }
}
