package com.alibaba.rsocket.upstream;

import com.alibaba.rsocket.ServiceLocator;
import com.alibaba.rsocket.cloudevents.CloudEventImpl;
import com.alibaba.rsocket.events.CloudEventSupport;
import com.alibaba.rsocket.events.CloudEventsConsumer;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-core-1.1.6.jar:com/alibaba/rsocket/upstream/ServiceInstancesChangedEventConsumer.class */
public class ServiceInstancesChangedEventConsumer implements CloudEventsConsumer {
    private static Logger log = LoggerFactory.getLogger((Class<?>) ServiceInstancesChangedEventConsumer.class);
    private UpstreamManager upstreamManager;

    public ServiceInstancesChangedEventConsumer(UpstreamManager upstreamManager) {
        this.upstreamManager = upstreamManager;
    }

    @Override // com.alibaba.rsocket.events.CloudEventsConsumer
    public boolean shouldAccept(CloudEventImpl<?> cloudEventImpl) {
        return ServiceInstancesChangedEvent.class.getCanonicalName().equalsIgnoreCase(cloudEventImpl.getAttributes().getType());
    }

    @Override // com.alibaba.rsocket.events.CloudEventsConsumer
    public Mono<Void> accept(CloudEventImpl<?> cloudEventImpl) {
        return Mono.fromRunnable(() -> {
            handleServicesChangedEvent(cloudEventImpl);
        });
    }

    public void handleServicesChangedEvent(CloudEventImpl<?> cloudEventImpl) {
        ServiceInstancesChangedEvent serviceInstancesChangedEvent = (ServiceInstancesChangedEvent) CloudEventSupport.unwrapData(cloudEventImpl, ServiceInstancesChangedEvent.class);
        if (serviceInstancesChangedEvent != null) {
            String serviceId = ServiceLocator.serviceId(serviceInstancesChangedEvent.getGroup(), serviceInstancesChangedEvent.getService(), serviceInstancesChangedEvent.getVersion());
            UpstreamCluster findClusterByServiceId = this.upstreamManager.findClusterByServiceId(serviceId);
            if (findClusterByServiceId != null) {
                if (serviceInstancesChangedEvent.getUris().isEmpty()) {
                    this.upstreamManager.remove(findClusterByServiceId);
                    Mono.delay(Duration.ofSeconds(60L)).subscribe(l -> {
                        try {
                            findClusterByServiceId.close();
                        } catch (Exception e) {
                        }
                    });
                } else {
                    findClusterByServiceId.setUris(serviceInstancesChangedEvent.getUris());
                }
                log.info(RsocketErrorCode.message("RST-300202", serviceId, String.join(StringArrayPropertyEditor.DEFAULT_SEPARATOR, serviceInstancesChangedEvent.getUris())));
                return;
            }
            if (serviceInstancesChangedEvent.getUris().isEmpty()) {
                return;
            }
            try {
                UpstreamCluster upstreamCluster = new UpstreamCluster(serviceInstancesChangedEvent.getGroup(), serviceInstancesChangedEvent.getService(), serviceInstancesChangedEvent.getVersion(), serviceInstancesChangedEvent.getUris());
                upstreamCluster.setRsocketAware(this.upstreamManager.requesterSupport());
                upstreamCluster.init();
                this.upstreamManager.add(upstreamCluster);
                log.info(RsocketErrorCode.message("RST-300202", serviceId, String.join(StringArrayPropertyEditor.DEFAULT_SEPARATOR, serviceInstancesChangedEvent.getUris())));
            } catch (Exception e) {
                log.error(RsocketErrorCode.message("RST-400500", String.join(StringArrayPropertyEditor.DEFAULT_SEPARATOR, serviceInstancesChangedEvent.getUris())), (Throwable) e);
            }
        }
    }
}
