package com.github.dm.jrt.stream.transform;

import com.github.dm.jrt.core.JRoutineCore;
import com.github.dm.jrt.core.channel.AbortException;
import com.github.dm.jrt.core.channel.Channel;
import com.github.dm.jrt.core.channel.ChannelConsumer;
import com.github.dm.jrt.core.common.RoutineException;
import com.github.dm.jrt.core.invocation.InvocationException;
import com.github.dm.jrt.core.invocation.InvocationInterruptedException;
import com.github.dm.jrt.core.runner.Execution;
import com.github.dm.jrt.core.runner.Runner;
import com.github.dm.jrt.core.util.ConstantConditions;
import com.github.dm.jrt.function.BiFunction;
import com.github.dm.jrt.function.Function;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:com/github/dm/jrt/stream/transform/RetryChannelConsumer.class */
class RetryChannelConsumer<IN, OUT> implements Execution, ChannelConsumer<OUT> {
    private final BiFunction<? super Integer, ? super RoutineException, ? extends Long> mBackoffFunction;
    private final Function<Channel<?, IN>, Channel<?, OUT>> mBindingFunction;
    private final Channel<?, IN> mInputChannel;
    private final Channel<OUT, ?> mOutputChannel;
    private final ArrayList<OUT> mOutputs = new ArrayList<>();
    private final Runner mRunner;
    private int mCount;

    /* loaded from: input_file:com/github/dm/jrt/stream/transform/RetryChannelConsumer$SafeChannelConsumer.class */
    private static class SafeChannelConsumer<IN> implements ChannelConsumer<IN> {
        private final Channel<IN, ?> mChannel;

        private SafeChannelConsumer(@NotNull Channel<IN, ?> channel) {
            this.mChannel = channel;
        }

        public void onComplete() {
            this.mChannel.close();
        }

        public void onError(@NotNull RoutineException routineException) {
            this.mChannel.abort(routineException);
        }

        public void onOutput(IN in) {
            try {
                this.mChannel.pass(in);
            } catch (InvocationInterruptedException e) {
                throw e;
            } catch (Throwable th) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RetryChannelConsumer(@NotNull Channel<?, IN> channel, @NotNull Channel<OUT, ?> channel2, @NotNull Runner runner, @NotNull Function<Channel<?, IN>, Channel<?, OUT>> function, @NotNull BiFunction<? super Integer, ? super RoutineException, ? extends Long> biFunction) {
        this.mInputChannel = (Channel) ConstantConditions.notNull("input channel instance", channel);
        this.mOutputChannel = (Channel) ConstantConditions.notNull("output channel instance", channel2);
        this.mRunner = (Runner) ConstantConditions.notNull("runner instance", runner);
        this.mBindingFunction = (Function) ConstantConditions.notNull("binding function", function);
        this.mBackoffFunction = (BiFunction) ConstantConditions.notNull("backoff function", biFunction);
    }

    public void onComplete() {
        Channel<OUT, ?> channel = this.mOutputChannel;
        try {
            channel.pass(this.mOutputs).close();
        } catch (Throwable th) {
            channel.abort(th);
            InvocationInterruptedException.throwIfInterrupt(th);
        }
    }

    public void run() {
        Channel buildChannel = JRoutineCore.io().buildChannel();
        this.mInputChannel.bind(new SafeChannelConsumer(buildChannel));
        try {
            ((Channel) this.mBindingFunction.apply(buildChannel)).bind(this);
        } catch (Throwable th) {
            abort(th);
            InvocationInterruptedException.throwIfInterrupt(th);
        }
    }

    private void abort(@NotNull Throwable th) {
        RoutineException wrapIfNeeded = InvocationException.wrapIfNeeded(th);
        this.mOutputChannel.abort(wrapIfNeeded);
        this.mInputChannel.abort(wrapIfNeeded);
    }

    public void onError(@NotNull RoutineException routineException) {
        Long l = null;
        if (!(routineException instanceof AbortException)) {
            try {
                BiFunction<? super Integer, ? super RoutineException, ? extends Long> biFunction = this.mBackoffFunction;
                int i = this.mCount + 1;
                this.mCount = i;
                l = (Long) biFunction.apply(Integer.valueOf(i), routineException);
            } catch (Throwable th) {
                abort(th);
                InvocationInterruptedException.throwIfInterrupt(th);
            }
        }
        if (l != null) {
            this.mOutputs.clear();
            this.mRunner.run(this, l.longValue(), TimeUnit.MILLISECONDS);
        } else {
            this.mOutputChannel.abort(routineException);
            this.mInputChannel.abort(routineException);
        }
    }

    public void onOutput(OUT out) {
        this.mOutputs.add(out);
    }
}
