package com.alibaba.rsocket.events;

import com.alibaba.rsocket.cloudevents.CloudEventImpl;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:BOOT-INF/lib/alibaba-rsocket-core-1.1.6.jar:com/alibaba/rsocket/events/CloudEventsProcessor.class */
public class CloudEventsProcessor {
    private List<CloudEventsConsumer> consumers;
    private Sinks.Many<CloudEventImpl> eventProcessor;

    public CloudEventsProcessor(Sinks.Many<CloudEventImpl> many, List<CloudEventsConsumer> list) {
        this.eventProcessor = many;
        this.consumers = list;
    }

    public void init() {
        this.eventProcessor.asFlux().subscribe(cloudEventImpl -> {
            Flux.fromIterable(this.consumers).filter(cloudEventsConsumer -> {
                return cloudEventsConsumer.shouldAccept(cloudEventImpl);
            }).flatMap(cloudEventsConsumer2 -> {
                return cloudEventsConsumer2.accept(cloudEventImpl);
            }).subscribe();
        });
    }

    public void addConsumer(CloudEventsConsumer cloudEventsConsumer) {
        this.consumers.add(cloudEventsConsumer);
    }
}
