package org.apache.iceberg.actions;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/actions/RemoveOrphanFilesAction.class */
public class RemoveOrphanFilesAction extends BaseSparkAction<List<String>> {
    private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
    private static final UserDefinedFunction filename = functions.udf(str -> {
        int lastIndexOf = str.lastIndexOf(File.separator);
        return lastIndexOf == -1 ? str : str.substring(lastIndexOf + 1);
    }, DataTypes.StringType);
    private final SparkSession spark;
    private final JavaSparkContext sparkContext;
    private final SerializableConfiguration hadoopConf;
    private final int partitionDiscoveryParallelism;
    private final Table table;
    private final TableOperations ops;
    private String location;
    private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
    private Consumer<String> deleteFunc = new Consumer<String>() { // from class: org.apache.iceberg.actions.RemoveOrphanFilesAction.1
        @Override // java.util.function.Consumer
        public void accept(String str) {
            RemoveOrphanFilesAction.this.table.io().deleteFile(str);
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public RemoveOrphanFilesAction(SparkSession sparkSession, Table table) {
        this.location = null;
        this.spark = sparkSession;
        this.sparkContext = new JavaSparkContext(sparkSession.sparkContext());
        this.hadoopConf = new SerializableConfiguration(sparkSession.sessionState().newHadoopConf());
        this.partitionDiscoveryParallelism = sparkSession.sessionState().conf().parallelPartitionDiscoveryParallelism();
        this.table = table;
        this.ops = ((HasTableOperations) table).operations();
        this.location = table.location();
        ValidationException.check(PropertyUtil.propertyAsBoolean(table.properties(), TableProperties.GC_ENABLED, true), "Cannot remove orphan files: GC is disabled (deleting files may corrupt other tables)", new Object[0]);
    }

    @Override // org.apache.iceberg.actions.BaseSparkAction
    protected Table table() {
        return this.table;
    }

    public RemoveOrphanFilesAction location(String str) {
        this.location = str;
        return this;
    }

    public RemoveOrphanFilesAction olderThan(long j) {
        this.olderThanTimestamp = j;
        return this;
    }

    public RemoveOrphanFilesAction deleteWith(Consumer<String> consumer) {
        this.deleteFunc = consumer;
        return this;
    }

    @Override // org.apache.iceberg.actions.Action
    public List<String> execute() {
        Dataset union = buildValidDataFileDF(this.spark).union(buildValidMetadataFileDF(this.spark, this.table, this.ops));
        Dataset<Row> buildActualFileDF = buildActualFileDF();
        List<String> collectAsList = buildActualFileDF.join(union, filename.apply(new Column[]{buildActualFileDF.col("file_path")}).equalTo(filename.apply(new Column[]{union.col("file_path")})).and(buildActualFileDF.col("file_path").contains(union.col("file_path"))), "leftanti").as(Encoders.STRING()).collectAsList();
        Tasks.Builder onFailure = Tasks.foreach(collectAsList).noRetry().suppressFailureWhenFinished().onFailure((str, exc) -> {
            LOG.warn("Failed to delete file: {}", str, exc);
        });
        Consumer<String> consumer = this.deleteFunc;
        Objects.requireNonNull(consumer);
        onFailure.run((v1) -> {
            r1.accept(v1);
        });
        return collectAsList;
    }

    private Dataset<Row> buildActualFileDF() {
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        listDirRecursively(this.location, fileStatus -> {
            return fileStatus.getModificationTime() < this.olderThanTimestamp;
        }, this.hadoopConf.value(), 3, 10, newArrayList, newArrayList2);
        JavaRDD parallelize = this.sparkContext.parallelize(newArrayList2, 1);
        return newArrayList.isEmpty() ? this.spark.createDataset(parallelize.rdd(), Encoders.STRING()).toDF(new String[]{"file_path"}) : this.spark.createDataset(parallelize.union(this.sparkContext.parallelize(newArrayList, Math.min(newArrayList.size(), this.partitionDiscoveryParallelism)).mapPartitions(listDirsRecursively(this.sparkContext.broadcast(this.hadoopConf), this.olderThanTimestamp))).rdd(), Encoders.STRING()).toDF(new String[]{"file_path"});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void listDirRecursively(String str, Predicate<FileStatus> predicate, Configuration configuration, int i, int i2, List<String> list, List<String> list2) {
        if (i <= 0) {
            list.add(str);
            return;
        }
        try {
            Path path = new Path(str);
            FileSystem fileSystem = path.getFileSystem(configuration);
            ArrayList newArrayList = Lists.newArrayList();
            for (FileStatus fileStatus : fileSystem.listStatus(path, HiddenPathFilter.get())) {
                if (fileStatus.isDirectory()) {
                    newArrayList.add(fileStatus.getPath().toString());
                } else if (fileStatus.isFile() && predicate.test(fileStatus)) {
                    list2.add(fileStatus.getPath().toString());
                }
            }
            if (newArrayList.size() > i2) {
                list.addAll(newArrayList);
                return;
            }
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                listDirRecursively((String) it.next(), predicate, configuration, i - 1, i2, list, list2);
            }
        } catch (IOException e) {
            throw new RuntimeIOException(e);
        }
    }

    private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(Broadcast<SerializableConfiguration> broadcast, long j) {
        return it -> {
            ArrayList newArrayList = Lists.newArrayList();
            ArrayList newArrayList2 = Lists.newArrayList();
            Predicate predicate = fileStatus -> {
                return fileStatus.getModificationTime() < j;
            };
            int i = 2000;
            int i2 = Integer.MAX_VALUE;
            it.forEachRemaining(str -> {
                listDirRecursively(str, predicate, ((SerializableConfiguration) broadcast.value()).value(), i, i2, newArrayList, newArrayList2);
            });
            if (newArrayList.isEmpty()) {
                return newArrayList2.iterator();
            }
            throw new RuntimeException("Could not list subdirectories, reached maximum subdirectory depth: 2000");
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1992156694:
                if (implMethodName.equals("lambda$listDirsRecursively$478c51c0$1")) {
                    z = true;
                    break;
                }
                break;
            case -652453233:
                if (implMethodName.equals("lambda$static$1c2534c6$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/api/java/UDF1") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/actions/RemoveOrphanFilesAction") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Object;")) {
                    return str -> {
                        int lastIndexOf = str.lastIndexOf(File.separator);
                        return lastIndexOf == -1 ? str : str.substring(lastIndexOf + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/iceberg/actions/RemoveOrphanFilesAction") && serializedLambda.getImplMethodSignature().equals("(JLorg/apache/spark/broadcast/Broadcast;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    long longValue = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    Broadcast broadcast = (Broadcast) serializedLambda.getCapturedArg(1);
                    return it -> {
                        List newArrayList = Lists.newArrayList();
                        List newArrayList2 = Lists.newArrayList();
                        Predicate predicate = fileStatus -> {
                            return fileStatus.getModificationTime() < longValue;
                        };
                        int i = 2000;
                        int i2 = Integer.MAX_VALUE;
                        it.forEachRemaining(str2 -> {
                            listDirRecursively(str2, predicate, ((SerializableConfiguration) broadcast.value()).value(), i, i2, newArrayList, newArrayList2);
                        });
                        if (newArrayList.isEmpty()) {
                            return newArrayList2.iterator();
                        }
                        throw new RuntimeException("Could not list subdirectories, reached maximum subdirectory depth: 2000");
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
