package com.alibaba.spring.boot.rsocket;

import brave.Tracing;
import com.alibaba.rsocket.RSocketAppContext;
import com.alibaba.rsocket.RSocketRequesterSupport;
import com.alibaba.rsocket.cloudevents.CloudEventImpl;
import com.alibaba.rsocket.events.CloudEventsConsumer;
import com.alibaba.rsocket.events.CloudEventsProcessor;
import com.alibaba.rsocket.health.RSocketServiceHealth;
import com.alibaba.rsocket.listen.RSocketResponderHandlerFactory;
import com.alibaba.rsocket.observability.MetricsService;
import com.alibaba.rsocket.route.RoutingEndpoint;
import com.alibaba.rsocket.rpc.LocalReactiveServiceCaller;
import com.alibaba.rsocket.rpc.RSocketResponderHandler;
import com.alibaba.rsocket.upstream.ServiceInstancesChangedEventConsumer;
import com.alibaba.rsocket.upstream.UpstreamCluster;
import com.alibaba.rsocket.upstream.UpstreamClusterChangedEventConsumer;
import com.alibaba.rsocket.upstream.UpstreamManager;
import com.alibaba.spring.boot.rsocket.health.RSocketServiceHealthImpl;
import com.alibaba.spring.boot.rsocket.observability.MetricsServicePrometheusImpl;
import com.alibaba.spring.boot.rsocket.responder.RSocketServicesPublishHook;
import com.alibaba.spring.boot.rsocket.responder.invocation.RSocketServiceAnnotationProcessor;
import com.alibaba.spring.boot.rsocket.upstream.JwtTokenNotFoundException;
import com.alibaba.spring.boot.rsocket.upstream.RSocketRequesterSupportBuilderImpl;
import com.alibaba.spring.boot.rsocket.upstream.RSocketRequesterSupportCustomizer;
import com.alibaba.spring.boot.rsocket.upstream.SmartLifecycleUpstreamManagerImpl;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.rsocket.SocketAcceptor;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@EnableConfigurationProperties({RSocketProperties.class})
@Configuration
@ConditionalOnExpression("${rsocket.disabled:false}==false")
/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-spring-boot-starter-1.1.6.jar:com/alibaba/spring/boot/rsocket/RSocketAutoConfiguration.class */
public class RSocketAutoConfiguration {

    @Autowired
    private RSocketProperties properties;

    @Value("${server.port:0}")
    private int serverPort;

    @Value("${management.server.port:0}")
    private int managementServerPort;

    @Autowired
    private ApplicationContext applicationContext;

    @Bean
    public Sinks.Many<CloudEventImpl> reactiveCloudEventProcessor() {
        return Sinks.many().multicast().onBackpressureBuffer();
    }

    @Bean(initMethod = "init")
    public CloudEventsProcessor cloudEventsProcessor(@Autowired @Qualifier("reactiveCloudEventProcessor") Sinks.Many<CloudEventImpl> many, ObjectProvider<CloudEventsConsumer> objectProvider) {
        return new CloudEventsProcessor(many, (List) objectProvider.stream().collect(Collectors.toList()));
    }

    @Bean
    public UpstreamClusterChangedEventConsumer upstreamClusterChangedEventConsumer(@Autowired UpstreamManager upstreamManager) {
        return new UpstreamClusterChangedEventConsumer(upstreamManager);
    }

    @Bean
    public ServiceInstancesChangedEventConsumer serviceInstancesChangedEventConsumer(@Autowired UpstreamManager upstreamManager) {
        return new ServiceInstancesChangedEventConsumer(upstreamManager);
    }

    @Bean
    public CloudEventToListenerConsumer cloudEventToListenerConsumer() {
        return new CloudEventToListenerConsumer();
    }

    @ConditionalOnMissingBean(type = {"brave.Tracing", "com.alibaba.rsocket.listen.RSocketResponderHandlerFactory"})
    @Bean
    public RSocketResponderHandlerFactory rsocketResponderHandlerFactory(@Autowired LocalReactiveServiceCaller localReactiveServiceCaller, @Autowired @Qualifier("reactiveCloudEventProcessor") Sinks.Many<CloudEventImpl> many) {
        return (connectionSetupPayload, rSocket) -> {
            return Mono.fromCallable(() -> {
                return new RSocketResponderHandler(localReactiveServiceCaller, many, rSocket, connectionSetupPayload);
            });
        };
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean(type = {"brave.Tracing"})
    @Bean
    public RSocketResponderHandlerFactory rsocketResponderHandlerFactoryWithZipkin(@Autowired LocalReactiveServiceCaller localReactiveServiceCaller, @Autowired @Qualifier("reactiveCloudEventProcessor") Sinks.Many<CloudEventImpl> many) {
        return (connectionSetupPayload, rSocket) -> {
            return Mono.fromCallable(() -> {
                RSocketResponderHandler rSocketResponderHandler = new RSocketResponderHandler(localReactiveServiceCaller, many, rSocket, connectionSetupPayload);
                rSocketResponderHandler.setTracer(((Tracing) this.applicationContext.getBean(Tracing.class)).tracer());
                return rSocketResponderHandler;
            });
        };
    }

    @ConditionalOnMissingBean({RSocketRequesterSupport.class})
    @Bean
    public RSocketRequesterSupport rsocketRequesterSupport(@Autowired RSocketProperties rSocketProperties, @Autowired Environment environment, @Autowired SocketAcceptor socketAcceptor, @Autowired ObjectProvider<RSocketRequesterSupportCustomizer> objectProvider) {
        RSocketRequesterSupportBuilderImpl rSocketRequesterSupportBuilderImpl = new RSocketRequesterSupportBuilderImpl(rSocketProperties, new EnvironmentProperties(environment), socketAcceptor);
        objectProvider.orderedStream().forEach(rSocketRequesterSupportCustomizer -> {
            rSocketRequesterSupportCustomizer.customize(rSocketRequesterSupportBuilderImpl);
        });
        return rSocketRequesterSupportBuilderImpl.build();
    }

    @ConditionalOnMissingBean({LocalReactiveServiceCaller.class})
    @Bean
    public RSocketServiceAnnotationProcessor rSocketServiceAnnotationProcessor(RSocketProperties rSocketProperties) {
        return new RSocketServiceAnnotationProcessor(rSocketProperties);
    }

    @Bean(initMethod = "init")
    public UpstreamManager rsocketUpstreamManager(@Autowired RSocketRequesterSupport rSocketRequesterSupport) throws JwtTokenNotFoundException {
        SmartLifecycleUpstreamManagerImpl smartLifecycleUpstreamManagerImpl = new SmartLifecycleUpstreamManagerImpl(rSocketRequesterSupport);
        if (this.properties.getBrokers() != null && !this.properties.getBrokers().isEmpty()) {
            if (this.properties.getJwtToken() == null || this.properties.getJwtToken().isEmpty()) {
                throw new JwtTokenNotFoundException();
            }
            UpstreamCluster upstreamCluster = new UpstreamCluster(null, "*", null);
            upstreamCluster.setUris(this.properties.getBrokers());
            smartLifecycleUpstreamManagerImpl.add(upstreamCluster);
        }
        smartLifecycleUpstreamManagerImpl.setP2pServices(this.properties.getP2pServices());
        if (this.properties.getRoutes() != null && !this.properties.getRoutes().isEmpty()) {
            for (RoutingEndpoint routingEndpoint : this.properties.getRoutes()) {
                UpstreamCluster upstreamCluster2 = new UpstreamCluster(routingEndpoint.getGroup(), routingEndpoint.getService(), routingEndpoint.getVersion());
                upstreamCluster2.setUris(routingEndpoint.getUris());
                smartLifecycleUpstreamManagerImpl.add(upstreamCluster2);
            }
        }
        return smartLifecycleUpstreamManagerImpl;
    }

    @ConditionalOnProperty({"rsocket.brokers"})
    @Bean
    public RSocketBrokerHealthIndicator rsocketBrokerHealth(RSocketEndpoint rSocketEndpoint, UpstreamManager upstreamManager, @Value("${rsocket.brokers}") String str) {
        return new RSocketBrokerHealthIndicator(rSocketEndpoint, upstreamManager, str);
    }

    @Bean
    public RSocketEndpoint rsocketEndpoint(@Autowired UpstreamManager upstreamManager, @Autowired RSocketRequesterSupport rSocketRequesterSupport) {
        return new RSocketEndpoint(this.properties, upstreamManager, rSocketRequesterSupport);
    }

    @ConditionalOnClass({PrometheusMeterRegistry.class})
    @Bean
    public MetricsService metricsService(PrometheusMeterRegistry prometheusMeterRegistry) {
        return new MetricsServicePrometheusImpl(prometheusMeterRegistry);
    }

    @Bean
    public RSocketServicesPublishHook rsocketServicesPublishHook() {
        return new RSocketServicesPublishHook();
    }

    @ConditionalOnMissingBean
    @Bean
    public RSocketServiceHealth rsocketServiceHealth() {
        return new RSocketServiceHealthImpl();
    }

    @Bean
    public ApplicationListener<WebServerInitializedEvent> webServerInitializedEventApplicationListener() {
        return webServerInitializedEvent -> {
            String serverNamespace = webServerInitializedEvent.getApplicationContext().getServerNamespace();
            int port = webServerInitializedEvent.getWebServer().getPort();
            if ("management".equals(serverNamespace)) {
                this.managementServerPort = port;
                RSocketAppContext.managementPort = port;
                return;
            }
            this.serverPort = port;
            RSocketAppContext.webPort = port;
            if (this.managementServerPort == 0) {
                this.managementServerPort = port;
                RSocketAppContext.managementPort = port;
            }
        };
    }
}
