package io.muserver.rest;

import io.muserver.ClientDisconnectedException;
import io.muserver.MuException;
import io.muserver.Mutils;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;

/* loaded from: input_file:io/muserver/rest/SseBroadcasterImpl.class */
class SseBroadcasterImpl implements SseBroadcaster {
    private volatile boolean isClosed = false;
    private final List<BiConsumer<SseEventSink, Throwable>> errorListeners = new CopyOnWriteArrayList();
    private final List<Consumer<SseEventSink>> closeListeners = new CopyOnWriteArrayList();
    private final List<SseEventSink> sinks = new CopyOnWriteArrayList();

    public void onError(BiConsumer<SseEventSink, Throwable> biConsumer) {
        Mutils.notNull("onError", biConsumer);
        throwIfClosed();
        this.errorListeners.add(biConsumer);
    }

    public void onClose(Consumer<SseEventSink> consumer) {
        Mutils.notNull("onClose", consumer);
        throwIfClosed();
        this.closeListeners.add(consumer);
    }

    public void register(SseEventSink sseEventSink) {
        Mutils.notNull("sseEventSink", sseEventSink);
        throwIfClosed();
        this.sinks.add(sseEventSink);
        if (sseEventSink instanceof JaxSseEventSinkImpl) {
            ((JaxSseEventSinkImpl) sseEventSink).setResponseCompleteHandler(responseInfo -> {
                Throwable muException;
                if (responseInfo.completedSuccessfully()) {
                    return;
                }
                switch (responseInfo.response().responseState()) {
                    case CLIENT_DISCONNECTED:
                        muException = new ClientDisconnectedException();
                        break;
                    case TIMED_OUT:
                        muException = new TimeoutException();
                        break;
                    case ERRORED:
                    default:
                        muException = new MuException("Generic error");
                        break;
                }
                onSinkErrored(sseEventSink, muException);
            });
        }
    }

    public CompletionStage<?> broadcast(OutboundSseEvent outboundSseEvent) {
        Mutils.notNull("event", outboundSseEvent);
        throwIfClosed();
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicInteger atomicInteger = new AtomicInteger(this.sinks.size());
        for (SseEventSink sseEventSink : this.sinks) {
            if (sseEventSink.isClosed()) {
                this.sinks.remove(sseEventSink);
                sendOnCloseEvent(sseEventSink);
                sendComplete(completableFuture, atomicInteger);
            } else {
                sseEventSink.send(outboundSseEvent).whenComplete((obj, th) -> {
                    if (th != null) {
                        onSinkErrored(sseEventSink, th);
                    }
                    sendComplete(completableFuture, atomicInteger);
                });
            }
        }
        return completableFuture;
    }

    private void onSinkErrored(SseEventSink sseEventSink, Throwable th) {
        if (this.sinks.remove(sseEventSink)) {
            try {
                sseEventSink.close();
            } catch (Exception e) {
            }
            Iterator<BiConsumer<SseEventSink, Throwable>> it = this.errorListeners.iterator();
            while (it.hasNext()) {
                it.next().accept(sseEventSink, th);
            }
        }
    }

    private static void sendComplete(CompletableFuture<?> completableFuture, AtomicInteger atomicInteger) {
        if (atomicInteger.decrementAndGet() == 0) {
            completableFuture.complete(null);
        }
    }

    public void close() {
        if (this.isClosed) {
            return;
        }
        for (SseEventSink sseEventSink : this.sinks) {
            try {
                sseEventSink.close();
                sendOnCloseEvent(sseEventSink);
            } catch (Exception e) {
            }
        }
        this.sinks.clear();
        this.isClosed = true;
    }

    private void sendOnCloseEvent(SseEventSink sseEventSink) {
        Iterator<Consumer<SseEventSink>> it = this.closeListeners.iterator();
        while (it.hasNext()) {
            it.next().accept(sseEventSink);
        }
    }

    private void throwIfClosed() {
        if (this.isClosed) {
            throw new IllegalStateException("This broadcaster has already been closed");
        }
    }

    public int connectedSinksCount() {
        return this.sinks.size();
    }
}
