package hs.jfx.eventstream.experimental.impl;

import hs.jfx.eventstream.api.Emitter;
import hs.jfx.eventstream.api.ObservableStream;
import hs.jfx.eventstream.api.OptionalValue;
import hs.jfx.eventstream.api.Subscriber;
import hs.jfx.eventstream.api.Subscription;
import hs.jfx.eventstream.core.impl.BaseChangeStream;
import hs.jfx.eventstream.core.impl.BaseEventStream;
import hs.jfx.eventstream.core.impl.BaseValueStream;
import hs.jfx.eventstream.experimental.Transactions;

/* loaded from: input_file:hs/jfx/eventstream/experimental/impl/TransactionalStream.class */
public abstract class TransactionalStream {

    /* loaded from: input_file:hs/jfx/eventstream/experimental/impl/TransactionalStream$Change.class */
    public static class Change<T> extends BaseChangeStream<T, T> {
        public Change(ObservableStream<T> observableStream) {
            super(new TransactionalSubscriber(observableStream));
        }
    }

    /* loaded from: input_file:hs/jfx/eventstream/experimental/impl/TransactionalStream$Event.class */
    public static class Event<T> extends BaseEventStream<T, T> {
        public Event(ObservableStream<T> observableStream) {
            super(new TransactionalSubscriber(observableStream));
        }
    }

    /* loaded from: input_file:hs/jfx/eventstream/experimental/impl/TransactionalStream$TransactionalSubscriber.class */
    private static class TransactionalSubscriber<T> implements Subscriber<T> {
        private final ObservableStream<T> source;
        private T storedEvent;
        private Subscription transactionFinishedSubscription;

        public TransactionalSubscriber(ObservableStream<T> observableStream) {
            this.source = observableStream;
        }

        public Subscription subscribe(Emitter<T> emitter) {
            Subscription subscribe = this.source.subscribe(obj -> {
                if (!Transactions.inProgress()) {
                    emitter.emit(obj);
                    return;
                }
                if (this.transactionFinishedSubscription == null) {
                    this.transactionFinishedSubscription = Transactions.register(() -> {
                        emitter.emit(this.storedEvent);
                        invalidateTransaction();
                    });
                }
                this.storedEvent = obj;
            });
            return () -> {
                subscribe.unsubscribe();
                if (this.transactionFinishedSubscription != null) {
                    this.transactionFinishedSubscription.unsubscribe();
                    invalidateTransaction();
                }
            };
        }

        private void invalidateTransaction() {
            this.transactionFinishedSubscription = null;
            this.storedEvent = null;
        }
    }

    /* loaded from: input_file:hs/jfx/eventstream/experimental/impl/TransactionalStream$Value.class */
    public static class Value<T> extends BaseValueStream<T, T> {
        public Value(ObservableStream<T> observableStream) {
            super(new TransactionalSubscriber(observableStream), (ObservableStream) null, OptionalValue::of);
        }
    }
}
