package com.alibaba.rsocket.upstream;

import com.alibaba.rsocket.ServiceLocator;
import com.alibaba.rsocket.cloudevents.CloudEventImpl;
import com.alibaba.rsocket.events.CloudEventSupport;
import com.alibaba.rsocket.events.CloudEventsConsumer;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-core-1.1.6.jar:com/alibaba/rsocket/upstream/UpstreamClusterChangedEventConsumer.class */
public class UpstreamClusterChangedEventConsumer implements CloudEventsConsumer {
    private static Logger log = LoggerFactory.getLogger((Class<?>) UpstreamClusterChangedEventConsumer.class);
    private UpstreamManager upstreamManager;

    public UpstreamClusterChangedEventConsumer(UpstreamManager upstreamManager) {
        this.upstreamManager = upstreamManager;
    }

    @Override // com.alibaba.rsocket.events.CloudEventsConsumer
    public boolean shouldAccept(CloudEventImpl<?> cloudEventImpl) {
        String type = cloudEventImpl.getAttributes().getType();
        String sourcing = cloudEventImpl.getSourcing();
        return UpstreamClusterChangedEvent.class.getCanonicalName().equalsIgnoreCase(type) && sourcing != null && sourcing.startsWith("upstream:broker:");
    }

    @Override // com.alibaba.rsocket.events.CloudEventsConsumer
    public Mono<Void> accept(CloudEventImpl<?> cloudEventImpl) {
        return Mono.fromRunnable(() -> {
            handleUpstreamClusterChangedEvent(cloudEventImpl);
        });
    }

    public void handleUpstreamClusterChangedEvent(CloudEventImpl<?> cloudEventImpl) {
        String serviceId;
        UpstreamCluster findClusterByServiceId;
        UpstreamClusterChangedEvent upstreamClusterChangedEvent = (UpstreamClusterChangedEvent) CloudEventSupport.unwrapData(cloudEventImpl, UpstreamClusterChangedEvent.class);
        if (upstreamClusterChangedEvent == null || (findClusterByServiceId = this.upstreamManager.findClusterByServiceId((serviceId = ServiceLocator.serviceId(upstreamClusterChangedEvent.getGroup(), upstreamClusterChangedEvent.getInterfaceName(), upstreamClusterChangedEvent.getVersion())))) == null) {
            return;
        }
        findClusterByServiceId.setUris(upstreamClusterChangedEvent.getUris());
        log.info(RsocketErrorCode.message("RST-300202", serviceId, String.join(StringArrayPropertyEditor.DEFAULT_SEPARATOR, upstreamClusterChangedEvent.getUris())));
    }
}
