package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.util.ElapsedTimeGauge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.class */
public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousIcebergEnumerator.class);
    private static final int ENUMERATION_SPLIT_COUNT_HISTORY_SIZE = 3;
    private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
    private final SplitAssigner assigner;
    private final ScanContext scanContext;
    private final ContinuousSplitPlanner splitPlanner;
    private final AtomicReference<IcebergEnumeratorPosition> enumeratorPosition;
    private final EnumerationHistory enumerationHistory;
    private transient int consecutiveFailures;
    private final ElapsedTimeGauge elapsedSecondsSinceLastSplitDiscovery;

    public ContinuousIcebergEnumerator(SplitEnumeratorContext<IcebergSourceSplit> splitEnumeratorContext, SplitAssigner splitAssigner, ScanContext scanContext, ContinuousSplitPlanner continuousSplitPlanner, @Nullable IcebergEnumeratorState icebergEnumeratorState) {
        super(splitEnumeratorContext, splitAssigner);
        this.consecutiveFailures = 0;
        this.enumeratorContext = splitEnumeratorContext;
        this.assigner = splitAssigner;
        this.scanContext = scanContext;
        this.splitPlanner = continuousSplitPlanner;
        this.enumeratorPosition = new AtomicReference<>();
        this.enumerationHistory = new EnumerationHistory(3);
        this.elapsedSecondsSinceLastSplitDiscovery = new ElapsedTimeGauge(TimeUnit.SECONDS);
        this.enumeratorContext.metricGroup().gauge("elapsedSecondsSinceLastSplitDiscovery", this.elapsedSecondsSinceLastSplitDiscovery);
        if (icebergEnumeratorState != null) {
            this.enumeratorPosition.set(icebergEnumeratorState.lastEnumeratedPosition());
            this.enumerationHistory.restore(icebergEnumeratorState.enumerationSplitCountHistory());
        }
    }

    @Override // org.apache.iceberg.flink.source.enumerator.AbstractIcebergEnumerator
    public void start() {
        super.start();
        this.enumeratorContext.callAsync(this::discoverSplits, this::processDiscoveredSplits, 0L, this.scanContext.monitorInterval().toMillis());
    }

    @Override // org.apache.iceberg.flink.source.enumerator.AbstractIcebergEnumerator
    public void close() throws IOException {
        this.splitPlanner.close();
        super.close();
    }

    @Override // org.apache.iceberg.flink.source.enumerator.AbstractIcebergEnumerator
    protected boolean shouldWaitForMoreSplits() {
        return true;
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public IcebergEnumeratorState m450snapshotState(long j) {
        return new IcebergEnumeratorState(this.enumeratorPosition.get(), this.assigner.state(), this.enumerationHistory.snapshot());
    }

    private ContinuousEnumerationResult discoverSplits() {
        int pendingSplitCount = this.assigner.pendingSplitCount();
        if (!this.enumerationHistory.shouldPauseSplitDiscovery(pendingSplitCount)) {
            return this.splitPlanner.planSplits(this.enumeratorPosition.get());
        }
        LOG.info("Pause split discovery as the assigner already has too many pending splits: {}", Integer.valueOf(pendingSplitCount));
        return new ContinuousEnumerationResult(Collections.emptyList(), this.enumeratorPosition.get(), this.enumeratorPosition.get());
    }

    private void processDiscoveredSplits(ContinuousEnumerationResult continuousEnumerationResult, Throwable th) {
        if (th != null) {
            this.consecutiveFailures++;
            if (this.scanContext.maxAllowedPlanningFailures() >= 0 && this.consecutiveFailures > this.scanContext.maxAllowedPlanningFailures()) {
                throw new RuntimeException("Failed to discover new splits", th);
            }
            LOG.error("Failed to discover new splits", th);
            return;
        }
        this.consecutiveFailures = 0;
        if (!Objects.equals(continuousEnumerationResult.fromPosition(), this.enumeratorPosition.get())) {
            LOG.info("Skip {} discovered splits because the scan starting position doesn't match the current enumerator position: enumerator position = {}, scan starting position = {}", new Object[]{Integer.valueOf(continuousEnumerationResult.splits().size()), this.enumeratorPosition.get(), continuousEnumerationResult.fromPosition()});
            return;
        }
        this.elapsedSecondsSinceLastSplitDiscovery.refreshLastRecordedTime();
        if (continuousEnumerationResult.splits().isEmpty()) {
            LOG.info("No new splits discovered between ({}, {}]", continuousEnumerationResult.fromPosition(), continuousEnumerationResult.toPosition());
        } else {
            this.assigner.onDiscoveredSplits(continuousEnumerationResult.splits());
            this.enumerationHistory.add(continuousEnumerationResult.splits().size());
            LOG.info("Added {} splits discovered between ({}, {}] to the assigner", new Object[]{Integer.valueOf(continuousEnumerationResult.splits().size()), continuousEnumerationResult.fromPosition(), continuousEnumerationResult.toPosition()});
        }
        this.enumeratorPosition.set(continuousEnumerationResult.toPosition());
        LOG.info("Update enumerator position to {}", continuousEnumerationResult.toPosition());
    }

    @Override // org.apache.iceberg.flink.source.enumerator.AbstractIcebergEnumerator
    public /* bridge */ /* synthetic */ void addReader(int i) {
        super.addReader(i);
    }

    @Override // org.apache.iceberg.flink.source.enumerator.AbstractIcebergEnumerator
    public /* bridge */ /* synthetic */ void addSplitsBack(List list, int i) {
        super.addSplitsBack(list, i);
    }

    @Override // org.apache.iceberg.flink.source.enumerator.AbstractIcebergEnumerator
    public /* bridge */ /* synthetic */ void handleSourceEvent(int i, SourceEvent sourceEvent) {
        super.handleSourceEvent(i, sourceEvent);
    }

    @Override // org.apache.iceberg.flink.source.enumerator.AbstractIcebergEnumerator
    public /* bridge */ /* synthetic */ void handleSplitRequest(int i, @Nullable String str) {
        super.handleSplitRequest(i, str);
    }
}
