package com.alibaba.spring.boot.rsocket;

import com.alibaba.rsocket.RSocketAppContext;
import com.alibaba.rsocket.RSocketRequesterSupport;
import com.alibaba.rsocket.ServiceLocator;
import com.alibaba.rsocket.cloudevents.CloudEventImpl;
import com.alibaba.rsocket.cloudevents.RSocketCloudEventBuilder;
import com.alibaba.rsocket.events.AppStatusEvent;
import com.alibaba.rsocket.events.ServicesExposedEvent;
import com.alibaba.rsocket.events.ServicesHiddenEvent;
import com.alibaba.rsocket.health.RSocketServiceHealth;
import com.alibaba.rsocket.invocation.RSocketRemoteServiceBuilder;
import com.alibaba.rsocket.loadbalance.LoadBalancedRSocket;
import com.alibaba.rsocket.upstream.UpstreamCluster;
import com.alibaba.rsocket.upstream.UpstreamManager;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.actuate.endpoint.annotation.Selector;
import org.springframework.boot.actuate.endpoint.annotation.WriteOperation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Endpoint(id = "rsocket")
/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-spring-boot-starter-1.1.6.jar:com/alibaba/spring/boot/rsocket/RSocketEndpoint.class */
public class RSocketEndpoint {
    private RSocketProperties properties;
    private RSocketRequesterSupport rsocketRequesterSupport;
    private UpstreamManager upstreamManager;
    private Integer rsocketServiceStatus = AppStatusEvent.STATUS_SERVING;
    private Set<String> offlineServices = new HashSet();
    private boolean serviceProvider;

    public RSocketEndpoint(RSocketProperties rSocketProperties, UpstreamManager upstreamManager, RSocketRequesterSupport rSocketRequesterSupport) {
        this.serviceProvider = false;
        this.properties = rSocketProperties;
        this.upstreamManager = upstreamManager;
        this.rsocketRequesterSupport = rSocketRequesterSupport;
        if (rSocketRequesterSupport.exposedServices().get().isEmpty()) {
            return;
        }
        this.serviceProvider = true;
    }

    @ReadOperation
    public Map<String, Object> info() {
        HashMap hashMap = new HashMap();
        hashMap.put("id", RSocketAppContext.ID);
        hashMap.put("serviceStatus", AppStatusEvent.statusText(this.rsocketServiceStatus));
        if (this.serviceProvider) {
            hashMap.put("published", this.rsocketRequesterSupport.exposedServices().get());
        }
        if (!RSocketRemoteServiceBuilder.CONSUMED_SERVICES.isEmpty()) {
            hashMap.put("subscribed", this.rsocketRequesterSupport.subscribedServices().get().stream().filter(serviceLocator -> {
                return !RSocketServiceHealth.class.getCanonicalName().equals(serviceLocator.getService());
            }).collect(Collectors.toList()));
        }
        Collection<UpstreamCluster> findAllClusters = this.upstreamManager.findAllClusters();
        if (!findAllClusters.isEmpty()) {
            hashMap.put("upstreams", findAllClusters.stream().map(upstreamCluster -> {
                HashMap hashMap2 = new HashMap();
                hashMap2.put("service", upstreamCluster.getServiceId());
                hashMap2.put("uris", upstreamCluster.getUris());
                LoadBalancedRSocket loadBalancedRSocket = upstreamCluster.getLoadBalancedRSocket();
                hashMap2.put("activeUris", loadBalancedRSocket.getActiveSockets().keySet());
                if (!loadBalancedRSocket.getUnHealthyUriSet().isEmpty()) {
                    hashMap2.put("unHealthyUris", loadBalancedRSocket.getUnHealthyUriSet());
                }
                hashMap2.put("lastRefreshTimeStamp", new Date(loadBalancedRSocket.getLastRefreshTimeStamp()));
                hashMap2.put("lastHealthCheckTimeStamp", new Date(loadBalancedRSocket.getLastHealthCheckTimeStamp()));
                return hashMap2;
            }).collect(Collectors.toList()));
        }
        UpstreamCluster findClusterByServiceId = this.upstreamManager.findClusterByServiceId("*");
        if (findClusterByServiceId != null) {
            hashMap.put("brokers", findClusterByServiceId.getUris());
        }
        if (this.properties.getMetadata() != null && !this.properties.getMetadata().isEmpty()) {
            hashMap.put("metadata", this.properties.getMetadata());
        }
        if (!this.offlineServices.isEmpty()) {
            hashMap.put("offlineServices", this.offlineServices);
        }
        return hashMap;
    }

    @WriteOperation
    public Mono<String> operate(@Selector String str) {
        if ("online".equalsIgnoreCase(str)) {
            this.rsocketServiceStatus = AppStatusEvent.STATUS_SERVING;
            return sendAppStatus(this.rsocketServiceStatus).thenReturn("Succeed to register RSocket services on brokers!");
        }
        if (str.startsWith("online-")) {
            String substring = str.substring("online-".length());
            ServiceLocator findServiceLocator = findServiceLocator(substring);
            if (findServiceLocator == null) {
                return Mono.just("Service not found:  " + substring);
            }
            this.offlineServices.remove(substring);
            return sendRegisterService(findServiceLocator).thenReturn("Succeed to register " + substring + " on brokers!");
        }
        if ("offline".equalsIgnoreCase(str)) {
            this.rsocketServiceStatus = AppStatusEvent.STATUS_OUT_OF_SERVICE;
            return sendAppStatus(this.rsocketServiceStatus).thenReturn("Succeed to unregister RSocket services on brokers!");
        }
        if (str.startsWith("offline-")) {
            String substring2 = str.substring("offline-".length());
            ServiceLocator findServiceLocator2 = findServiceLocator(substring2);
            if (findServiceLocator2 == null) {
                return Mono.just("Service not found:  " + substring2);
            }
            this.offlineServices.add(substring2);
            return sendUnRegisterService(findServiceLocator2).thenReturn("Succeed to unregister " + substring2 + " on brokers!");
        }
        if ("shutdown".equalsIgnoreCase(str)) {
            this.rsocketServiceStatus = AppStatusEvent.STATUS_STOPPED;
            return sendAppStatus(this.rsocketServiceStatus).thenReturn("Succeed to unregister RSocket services on brokers! Please wait almost 60 seconds to shutdown the Spring Boot App!");
        }
        if (!"refreshUpstreams".equalsIgnoreCase(str)) {
            return Mono.just("Unknown action, please use online, offline and shutdown");
        }
        Iterator<UpstreamCluster> it = this.upstreamManager.findAllClusters().iterator();
        while (it.hasNext()) {
            it.next().getLoadBalancedRSocket().refreshUnHealthyUris();
        }
        return Mono.just("Begin to refresh unHealthy upstream clusters now!");
    }

    public Mono<Void> sendAppStatus(Integer num) {
        CloudEventImpl build = RSocketCloudEventBuilder.builder(new AppStatusEvent(RSocketAppContext.ID, num)).build();
        return Flux.fromIterable(this.upstreamManager.findAllClusters()).flatMap(upstreamCluster -> {
            return upstreamCluster.getLoadBalancedRSocket().fireCloudEventToUpstreamAll(build);
        }).then();
    }

    public Mono<Void> sendRegisterService(ServiceLocator serviceLocator) {
        CloudEventImpl<ServicesExposedEvent> convertServicesToCloudEvent = ServicesExposedEvent.convertServicesToCloudEvent(Collections.singletonList(serviceLocator));
        return Flux.fromIterable(this.upstreamManager.findAllClusters()).flatMap(upstreamCluster -> {
            return upstreamCluster.getLoadBalancedRSocket().fireCloudEventToUpstreamAll(convertServicesToCloudEvent);
        }).then();
    }

    public Mono<Void> sendUnRegisterService(ServiceLocator serviceLocator) {
        CloudEventImpl<ServicesHiddenEvent> convertServicesToCloudEvent = ServicesHiddenEvent.convertServicesToCloudEvent(Collections.singletonList(serviceLocator));
        return Flux.fromIterable(this.upstreamManager.findAllClusters()).flatMap(upstreamCluster -> {
            return upstreamCluster.getLoadBalancedRSocket().fireCloudEventToUpstreamAll(convertServicesToCloudEvent);
        }).then();
    }

    @Nullable
    private ServiceLocator findServiceLocator(String str) {
        ServiceLocator serviceLocator = null;
        Iterator<ServiceLocator> it = this.rsocketRequesterSupport.exposedServices().get().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ServiceLocator next = it.next();
            if (str.equals(next.getService())) {
                serviceLocator = next;
                break;
            }
        }
        return serviceLocator;
    }

    public Integer getRsocketServiceStatus() {
        return this.rsocketServiceStatus;
    }

    public boolean isServiceProvider() {
        return this.serviceProvider;
    }
}
