package hs.jfx.eventstream.core.impl;

import hs.jfx.eventstream.api.ChangeStream;
import hs.jfx.eventstream.api.Emitter;
import hs.jfx.eventstream.api.EventStream;
import hs.jfx.eventstream.api.ObservableStream;
import hs.jfx.eventstream.api.OptionalValue;
import hs.jfx.eventstream.api.Subscriber;
import hs.jfx.eventstream.api.Subscription;
import hs.jfx.eventstream.api.ValueStream;
import java.util.Objects;
import java.util.function.Consumer;

/* loaded from: input_file:hs/jfx/eventstream/core/impl/PeekStreams.class */
public abstract class PeekStreams {
    public static <T> EventStream<T> event(ObservableStream<T> observableStream, Consumer<? super T> consumer) {
        return new BaseEventStream(subscriber(observableStream, (Consumer) Objects.requireNonNull(consumer)));
    }

    public static <T> ChangeStream<T> change(ObservableStream<T> observableStream, Consumer<? super T> consumer) {
        return new BaseChangeStream(subscriber(observableStream, (Consumer) Objects.requireNonNull(consumer)));
    }

    public static <T> ValueStream<T> value(ObservableStream<T> observableStream, Consumer<? super T> consumer) {
        return new BaseValueStream(subscriber(observableStream, (Consumer) Objects.requireNonNull(consumer)), observableStream, OptionalValue::of);
    }

    private static <T> Subscriber<T> subscriber(final ObservableStream<T> observableStream, final Consumer<? super T> consumer) {
        return new Subscriber<T>() { // from class: hs.jfx.eventstream.core.impl.PeekStreams.1
            private boolean sideEffectInProgress = false;

            public Subscription subscribe(Emitter<T> emitter) {
                ObservableStream observableStream2 = observableStream;
                Consumer consumer2 = consumer;
                return observableStream2.subscribe(obj -> {
                    if (this.sideEffectInProgress) {
                        throw new IllegalStateException("Side effect is not allowed to cause recursive event emission");
                    }
                    this.sideEffectInProgress = true;
                    try {
                        consumer2.accept(obj);
                        this.sideEffectInProgress = false;
                        emitter.emit(obj);
                    } catch (Throwable th) {
                        this.sideEffectInProgress = false;
                        throw th;
                    }
                });
            }
        };
    }
}
