package com.github.dm.jrt.stream.transform;

import com.github.dm.jrt.core.channel.Channel;
import com.github.dm.jrt.core.channel.ChannelConsumer;
import com.github.dm.jrt.core.common.RoutineException;
import com.github.dm.jrt.core.runner.Execution;
import com.github.dm.jrt.core.runner.Runner;
import com.github.dm.jrt.core.util.ConstantConditions;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:com/github/dm/jrt/stream/transform/TimeoutChannelConsumer.class */
class TimeoutChannelConsumer<OUT> implements ChannelConsumer<OUT> {
    private final Channel<OUT, ?> mOutputChannel;
    private final Runner mRunner;
    private final long mTimeout;
    private final TimeUnit mTimeoutUnit;
    private final AtomicLong mCount = new AtomicLong();
    private TimeoutChannelConsumer<OUT>.AbortExecution mExecution = new AbortExecution();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/dm/jrt/stream/transform/TimeoutChannelConsumer$AbortExecution.class */
    public class AbortExecution implements Execution {
        private final long mExecutionCount;

        AbortExecution() {
            this.mExecutionCount = TimeoutChannelConsumer.this.mCount.incrementAndGet();
        }

        public void run() {
            if (this.mExecutionCount == TimeoutChannelConsumer.this.mCount.get()) {
                TimeoutChannelConsumer.this.mOutputChannel.abort(new ResultTimeoutException("timeout while waiting for inputs: [" + TimeoutChannelConsumer.this.mTimeout + " " + TimeoutChannelConsumer.this.mTimeout + "]"));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TimeoutChannelConsumer(long j, @NotNull TimeUnit timeUnit, @NotNull Runner runner, @NotNull Channel<OUT, ?> channel) {
        this.mTimeout = ConstantConditions.notNegative("timeout value", j);
        this.mTimeoutUnit = (TimeUnit) ConstantConditions.notNull("timeout unit", timeUnit);
        this.mRunner = (Runner) ConstantConditions.notNull("runner instance", runner);
        this.mOutputChannel = (Channel) ConstantConditions.notNull("output channel", channel);
    }

    public void onComplete() throws Exception {
        this.mOutputChannel.close();
    }

    public void onError(@NotNull RoutineException routineException) {
        this.mOutputChannel.abort(routineException);
    }

    public void onOutput(OUT out) {
        restartTimeout();
        this.mOutputChannel.pass(out);
    }

    private void restartTimeout() {
        Runner runner = this.mRunner;
        runner.cancel(this.mExecution);
        TimeoutChannelConsumer<OUT>.AbortExecution abortExecution = new AbortExecution();
        this.mExecution = abortExecution;
        runner.run(abortExecution, this.mTimeout, this.mTimeoutUnit);
    }
}
