package org.apache.iceberg.flink.source.reader;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Queue;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.class */
public class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, IcebergSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceSplitReader.class);
    private final IcebergSourceReaderMetrics metrics;
    private final ReaderFunction<T> openSplitFunction;
    private final SerializableComparator<IcebergSourceSplit> splitComparator;
    private final int indexOfSubtask;
    private final Queue<IcebergSourceSplit> splits = Queues.newArrayDeque();
    private CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> currentReader;
    private IcebergSourceSplit currentSplit;
    private String currentSplitId;

    /* JADX INFO: Access modifiers changed from: package-private */
    public IcebergSourceSplitReader(IcebergSourceReaderMetrics icebergSourceReaderMetrics, ReaderFunction<T> readerFunction, SerializableComparator<IcebergSourceSplit> serializableComparator, SourceReaderContext sourceReaderContext) {
        this.metrics = icebergSourceReaderMetrics;
        this.openSplitFunction = readerFunction;
        this.splitComparator = serializableComparator;
        this.indexOfSubtask = sourceReaderContext.getIndexOfSubtask();
    }

    public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
        this.metrics.incrementSplitReaderFetchCalls(1L);
        if (this.currentReader == null) {
            IcebergSourceSplit poll = this.splits.poll();
            if (poll == null) {
                return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
            }
            this.currentSplit = poll;
            this.currentSplitId = poll.splitId();
            this.currentReader = (CloseableIterator) this.openSplitFunction.apply(this.currentSplit);
        }
        if (!this.currentReader.hasNext()) {
            return finishSplit();
        }
        try {
            return this.currentReader.next();
        } catch (UncheckedIOException e) {
            throw e.getCause();
        }
    }

    public void handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("Unsupported split change: %s", splitsChange.getClass()));
        }
        if (this.splitComparator != null) {
            ArrayList newArrayList = Lists.newArrayList(splitsChange.splits());
            newArrayList.sort(this.splitComparator);
            LOG.info("Add {} splits to reader: {}", Integer.valueOf(newArrayList.size()), newArrayList);
            this.splits.addAll(newArrayList);
        } else {
            LOG.info("Add {} splits to reader", Integer.valueOf(splitsChange.splits().size()));
            this.splits.addAll(splitsChange.splits());
        }
        this.metrics.incrementAssignedSplits(splitsChange.splits().size());
        this.metrics.incrementAssignedBytes(calculateBytes(splitsChange));
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        this.currentSplitId = null;
        if (this.currentReader != null) {
            this.currentReader.close();
        }
    }

    private long calculateBytes(IcebergSourceSplit icebergSourceSplit) {
        return ((Long) icebergSourceSplit.task().files().stream().map((v0) -> {
            return v0.length();
        }).reduce(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })).longValue();
    }

    private long calculateBytes(SplitsChange<IcebergSourceSplit> splitsChange) {
        return ((Long) splitsChange.splits().stream().map(this::calculateBytes).reduce(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })).longValue();
    }

    private ArrayBatchRecords<T> finishSplit() throws IOException {
        if (this.currentReader != null) {
            this.currentReader.close();
            this.currentReader = null;
        }
        ArrayBatchRecords<T> finishedSplit = ArrayBatchRecords.finishedSplit(this.currentSplitId);
        LOG.info("Split reader {} finished split: {}", Integer.valueOf(this.indexOfSubtask), this.currentSplitId);
        this.metrics.incrementFinishedSplits(1L);
        this.metrics.incrementFinishedBytes(calculateBytes(this.currentSplit));
        this.currentSplitId = null;
        return finishedSplit;
    }
}
