package com.github.dm.jrt.stream.transform;

import com.github.dm.jrt.core.JRoutineCore;
import com.github.dm.jrt.core.builder.ChannelBuilder;
import com.github.dm.jrt.core.channel.Channel;
import com.github.dm.jrt.core.config.ChannelConfiguration;
import com.github.dm.jrt.core.invocation.InvocationInterruptedException;
import com.github.dm.jrt.core.util.ConstantConditions;
import com.github.dm.jrt.core.util.SimpleQueue;
import com.github.dm.jrt.function.BiFunction;
import com.github.dm.jrt.function.Function;
import com.github.dm.jrt.stream.builder.StreamBuilder;
import com.github.dm.jrt.stream.transform.ThrottleChannelConsumer;
import org.jetbrains.annotations.NotNull;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/github/dm/jrt/stream/transform/BindThrottle.class */
public class BindThrottle<IN, OUT> implements BiFunction<StreamBuilder.StreamConfiguration, Function<Channel<?, IN>, Channel<?, OUT>>, Function<? super Channel<?, IN>, ? extends Channel<?, OUT>>> {
    private final int mMaxCount;
    private final Object mMutex = new Object();
    private final SimpleQueue<Runnable> mQueue = new SimpleQueue<>();
    private int mCount;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/dm/jrt/stream/transform/BindThrottle$BindingFunction.class */
    public class BindingFunction implements Function<Channel<?, IN>, Channel<?, OUT>>, ThrottleChannelConsumer.CompletionHandler {
        private final Function<? super Channel<?, IN>, ? extends Channel<?, OUT>> mBindingFunction;
        private final ChannelConfiguration mConfiguration;

        private BindingFunction(@NotNull ChannelConfiguration channelConfiguration, @NotNull Function<? super Channel<?, IN>, ? extends Channel<?, OUT>> function) {
            this.mConfiguration = (ChannelConfiguration) ConstantConditions.notNull("channel configuration", channelConfiguration);
            this.mBindingFunction = (Function) ConstantConditions.notNull("binding function", function);
        }

        public Channel<?, OUT> apply(final Channel<?, IN> channel) throws Exception {
            boolean z;
            final Channel<?, OUT> buildChannel = ((ChannelBuilder) JRoutineCore.io().apply(this.mConfiguration)).buildChannel();
            synchronized (BindThrottle.this.mMutex) {
                z = BindThrottle.access$204(BindThrottle.this) <= BindThrottle.this.mMaxCount;
                if (!z) {
                    BindThrottle.this.mQueue.add(new Runnable() { // from class: com.github.dm.jrt.stream.transform.BindThrottle.BindingFunction.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                ((Channel) BindingFunction.this.mBindingFunction.apply(channel)).bind(new ThrottleChannelConsumer(BindingFunction.this, buildChannel));
                            } catch (Throwable th) {
                                buildChannel.abort(th);
                                BindingFunction.this.onComplete();
                                InvocationInterruptedException.throwIfInterrupt(th);
                            }
                        }
                    });
                }
            }
            if (z) {
                ((Channel) this.mBindingFunction.apply(channel)).bind(new ThrottleChannelConsumer(this, buildChannel));
            }
            return buildChannel;
        }

        @Override // com.github.dm.jrt.stream.transform.ThrottleChannelConsumer.CompletionHandler
        public void onComplete() {
            synchronized (BindThrottle.this.mMutex) {
                BindThrottle.access$206(BindThrottle.this);
                SimpleQueue simpleQueue = BindThrottle.this.mQueue;
                if (simpleQueue.isEmpty()) {
                    return;
                }
                ((Runnable) simpleQueue.removeFirst()).run();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BindThrottle(int i) {
        this.mMaxCount = ConstantConditions.positive("max count", i);
    }

    public Function<? super Channel<?, IN>, ? extends Channel<?, OUT>> apply(StreamBuilder.StreamConfiguration streamConfiguration, Function<Channel<?, IN>, Channel<?, OUT>> function) {
        return new BindingFunction(streamConfiguration.toChannelConfiguration(), function);
    }

    static /* synthetic */ int access$204(BindThrottle bindThrottle) {
        int i = bindThrottle.mCount + 1;
        bindThrottle.mCount = i;
        return i;
    }

    static /* synthetic */ int access$206(BindThrottle bindThrottle) {
        int i = bindThrottle.mCount - 1;
        bindThrottle.mCount = i;
        return i;
    }
}
