package name.nkonev.r2dbc.migrate.core;

import dev.miku.r2dbc.mysql.MySqlConnectionFactoryProvider;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.ValidationDepth;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.logging.Level;
import java.util.stream.Collectors;
import name.nkonev.r2dbc.migrate.core.FilenameParser;
import name.nkonev.r2dbc.migrate.reader.MigrateResource;
import name.nkonev.r2dbc.migrate.reader.MigrateResourceReader;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

/* loaded from: input_file:BOOT-INF/lib/r2dbc-migrate-core-2.9.4.jar:name/nkonev/r2dbc/migrate/core/R2dbcMigrate.class */
public abstract class R2dbcMigrate {
    private static final Logger LOGGER = Loggers.getLogger((Class<?>) R2dbcMigrate.class);
    private static final String ROWS_UPDATED = "By '{}' {} rows updated";

    private static List<MigrateResource> getResources(String str, MigrateResourceReader migrateResourceReader) {
        return migrateResourceReader.getResources(str);
    }

    private static Dialect getSqlDialect(R2dbcMigrateProperties r2dbcMigrateProperties, Connection connection) {
        if (r2dbcMigrateProperties.getDialect() != null) {
            switch (r2dbcMigrateProperties.getDialect()) {
                case POSTGRESQL:
                    return Dialect.POSTGRESQL;
                case MSSQL:
                    return Dialect.MSSQL;
                case MYSQL:
                    return Dialect.MYSQL;
                case H2:
                    return Dialect.H2;
                case MARIADB:
                    return Dialect.MARIADB;
                default:
                    throw new RuntimeException("Unsupported dialect: " + r2dbcMigrateProperties.getDialect());
            }
        }
        Optional map = Optional.ofNullable(connection.getMetadata()).map(connectionMetadata -> {
            return connectionMetadata.getDatabaseProductName();
        }).map(str -> {
            return str.toLowerCase();
        });
        if (map.isPresent()) {
            if (((String) map.get()).contains(PostgresqlConnectionFactoryProvider.LEGACY_POSTGRESQL_DRIVER)) {
                return Dialect.POSTGRESQL;
            }
            if (((String) map.get()).contains("microsoft")) {
                return Dialect.MSSQL;
            }
            if (((String) map.get()).contains(MySqlConnectionFactoryProvider.MYSQL_DRIVER)) {
                return Dialect.MYSQL;
            }
            if (((String) map.get()).contains(ApplicationProtocolNames.HTTP_2)) {
                return Dialect.H2;
            }
            if (((String) map.get()).contains("maria")) {
                return Dialect.MARIADB;
            }
        }
        throw new RuntimeException("Cannot recognize dialect. Try to set it explicitly.");
    }

    private static <T> Flux<T> withAutoCommit(Connection connection, Publisher<T> publisher) {
        return Mono.from(connection.setAutoCommit(true)).thenMany(publisher);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Mono<Void> transactionalWrap(Connection connection, boolean z, Publisher<? extends Result> publisher, String str) {
        Mono doOnSuccess = Flux.from(publisher).flatMap((v0) -> {
            return v0.getRowsUpdated();
        }).switchIfEmpty(Mono.just(0L)).reduceWith(() -> {
            return 0L;
        }, (v0, v1) -> {
            return Long.sum(v0, v1);
        }).doOnSuccess(l -> {
            LOGGER.info(ROWS_UPDATED, str, l);
        });
        return z ? Mono.from(connection.beginTransaction()).then(doOnSuccess).then(Mono.from(connection.commitTransaction())) : withAutoCommit(connection, doOnSuccess).then();
    }

    private static <T> Mono<Void> transactionalWrapUnchecked(Connection connection, boolean z, Publisher<T> publisher) {
        Flux from = Flux.from(publisher);
        return z ? Mono.from(connection.beginTransaction()).thenMany(from).then(Mono.from(connection.commitTransaction())) : withAutoCommit(connection, from).then();
    }

    private static Mono<Void> waitForDatabase(ConnectionFactory connectionFactory, R2dbcMigrateProperties r2dbcMigrateProperties) {
        return !r2dbcMigrateProperties.isWaitForDatabase() ? Mono.empty() : Mono.usingWhen(Mono.defer(() -> {
            LOGGER.info("Creating new test connection");
            return Mono.from(connectionFactory.create());
        }), connection -> {
            Flux from = Flux.from(connection.validate(ValidationDepth.REMOTE));
            Boolean bool = Boolean.TRUE;
            Objects.requireNonNull(bool);
            return from.filter((v1) -> {
                return r1.equals(v1);
            }).switchIfEmpty(Mono.error(new RuntimeException("Connection is not valid"))).then(Flux.from(connection.createStatement(r2dbcMigrateProperties.getValidationQuery()).execute()).flatMap(result -> {
                return result.map(getResultSafely("validation_result", String.class, "__VALIDATION_RESULT_NOT_PROVIDED"));
            }).filter(str -> {
                LOGGER.info("Comparing expected value '{}' with provided result '{}'", r2dbcMigrateProperties.getValidationQueryExpectedResultValue(), str);
                return r2dbcMigrateProperties.getValidationQueryExpectedResultValue().equals(str);
            }).switchIfEmpty(Mono.error(new RuntimeException("Not matched result of test query"))).last());
        }, connection2 -> {
            LOGGER.info("Closing test connection");
            return connection2.close();
        }).log("R2dbcMigrateCreatingTestConnection", Level.FINE, new SignalType[0]).timeout(r2dbcMigrateProperties.getValidationQueryTimeout()).retryWhen(Retry.fixedDelay(r2dbcMigrateProperties.getConnectionMaxRetries(), r2dbcMigrateProperties.getValidationRetryDelay()).doBeforeRetry(retrySignal -> {
            LOGGER.warn("Retrying to get database connection due {}: {}", retrySignal.failure().getClass(), retrySignal.failure().getMessage());
        })).doOnSuccess(str -> {
            LOGGER.info("Successfully got result '{}' of test query", str);
        }).then();
    }

    public static Mono<Void> migrate(ConnectionFactory connectionFactory, R2dbcMigrateProperties r2dbcMigrateProperties, MigrateResourceReader migrateResourceReader, SqlQueries sqlQueries, Locker locker) {
        LOGGER.info("Configured with {}", r2dbcMigrateProperties);
        if (!r2dbcMigrateProperties.isEnable()) {
            return Mono.empty();
        }
        List<Tuple2<MigrateResource, FilenameParser.MigrationInfo>> fileResources = getFileResources(r2dbcMigrateProperties, migrateResourceReader);
        LOGGER.info("Found {} sql scripts, see details below", Integer.valueOf(fileResources.size()));
        List list = (List) fileResources.stream().filter(tuple2 -> {
            return ((FilenameParser.MigrationInfo) tuple2.getT2()).isPremigration();
        }).collect(Collectors.toList());
        LOGGER.info("Found {} premigration sql scripts", Integer.valueOf(list.size()));
        List list2 = (List) fileResources.stream().filter(tuple22 -> {
            return !((FilenameParser.MigrationInfo) tuple22.getT2()).isPremigration();
        }).collect(Collectors.toList());
        LOGGER.info("Found {} migration sql scripts", Integer.valueOf(list2.size()));
        return waitForDatabase(connectionFactory, r2dbcMigrateProperties).then(premigrate(connectionFactory, r2dbcMigrateProperties, list)).then(Mono.usingWhen(connectionFactory.create(), connection -> {
            return doWork(connection, r2dbcMigrateProperties, list2, sqlQueries, locker);
        }, (v0) -> {
            return v0.close();
        }).onErrorResume(th -> {
            return releaseLockAfterError(th, connectionFactory, r2dbcMigrateProperties, locker).then(Mono.error(th));
        }));
    }

    private static Mono<Void> ensureInternals(Connection connection, SqlQueries sqlQueries, Locker locker) {
        Batch createBatch = connection.createBatch();
        List<String> createInternalTables = sqlQueries.createInternalTables();
        Objects.requireNonNull(createBatch);
        createInternalTables.forEach(createBatch::add);
        List<String> createInternalTables2 = locker.createInternalTables();
        Objects.requireNonNull(createBatch);
        createInternalTables2.forEach(createBatch::add);
        return transactionalWrap(connection, true, createBatch.execute(), "Making internal tables");
    }

    private static Mono<Void> acquireOrWaitForLock(Connection connection, Locker locker, R2dbcMigrateProperties r2dbcMigrateProperties) {
        return transactionalWrapUnchecked(connection, true, locker.extractResultOrError(Mono.from(locker.tryAcquireLock(connection).execute())).retryWhen(Retry.fixedDelay(r2dbcMigrateProperties.getAcquireLockMaxRetries(), r2dbcMigrateProperties.getAcquireLockRetryDelay()).doAfterRetry(retrySignal -> {
            LOGGER.warn("Waiting for lock");
        })));
    }

    private static List<Tuple2<MigrateResource, FilenameParser.MigrationInfo>> getResourcesFromPath(String str, MigrateResourceReader migrateResourceReader) {
        return (List) getResources(str, migrateResourceReader).stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).filter((v0) -> {
            return v0.isReadable();
        }).map(migrateResource -> {
            LOGGER.debug("Reading {}", migrateResource);
            return Tuples.of(migrateResource, FilenameParser.getMigrationInfo(migrateResource.getFilename()));
        }).collect(Collectors.toList());
    }

    private static List<Tuple2<MigrateResource, FilenameParser.MigrationInfo>> getFileResources(R2dbcMigrateProperties r2dbcMigrateProperties, MigrateResourceReader migrateResourceReader) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = r2dbcMigrateProperties.getResourcesPaths().iterator();
        while (it.hasNext()) {
            arrayList.addAll(getResourcesFromPath(it.next(), migrateResourceReader));
        }
        return (List) arrayList.stream().sorted((tuple2, tuple22) -> {
            return Integer.compare(((FilenameParser.MigrationInfo) tuple2.getT2()).getVersion(), ((FilenameParser.MigrationInfo) tuple22.getT2()).getVersion());
        }).peek(tuple23 -> {
            LOGGER.debug("From {} parsed metadata {}", tuple23.getT1(), tuple23.getT2());
        }).collect(Collectors.toList());
    }

    private static Mono<Void> releaseLock(Connection connection, Locker locker) {
        return transactionalWrap(connection, true, locker.releaseLock(connection).execute(), "Releasing lock");
    }

    private static Mono<Void> releaseLockAfterError(Throwable th, ConnectionFactory connectionFactory, R2dbcMigrateProperties r2dbcMigrateProperties, Locker locker) {
        LOGGER.error("Got error during migration, will release lock", th);
        return Mono.usingWhen(connectionFactory.create(), connection -> {
            return transactionalWrap(connection, false, getUserOrDeterminedLocker(getSqlDialect(r2dbcMigrateProperties, connection), r2dbcMigrateProperties, locker).releaseLock(connection).execute(), "Releasing lock after error");
        }, (v0) -> {
            return v0.close();
        });
    }

    private static Mono<Void> premigrate(ConnectionFactory connectionFactory, R2dbcMigrateProperties r2dbcMigrateProperties, List<Tuple2<MigrateResource, FilenameParser.MigrationInfo>> list) {
        return list.isEmpty() ? Mono.empty() : Mono.usingWhen(Mono.defer(() -> {
            LOGGER.info("Creating new premigration connection");
            return Mono.from(connectionFactory.create());
        }), connection -> {
            return Flux.fromIterable(list).concatMap(tuple2 -> {
                return makeMigration(connection, r2dbcMigrateProperties, tuple2).log("R2dbcMigrateMakePreMigrationWork", Level.FINE, new SignalType[0]);
            }, 1).then();
        }, connection2 -> {
            LOGGER.info("Closing premigration connection");
            return connection2.close();
        }).log("R2dbcMigrateCreatingPreMigrationConnection", Level.FINE, new SignalType[0]);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Mono<Void> doWork(Connection connection, R2dbcMigrateProperties r2dbcMigrateProperties, List<Tuple2<MigrateResource, FilenameParser.MigrationInfo>> list, SqlQueries sqlQueries, Locker locker) {
        Dialect sqlDialect = getSqlDialect(r2dbcMigrateProperties, connection);
        SqlQueries userOrDeterminedSqlQueries = getUserOrDeterminedSqlQueries(sqlDialect, r2dbcMigrateProperties, sqlQueries);
        Locker userOrDeterminedLocker = getUserOrDeterminedLocker(sqlDialect, r2dbcMigrateProperties, locker);
        return ensureInternals(connection, userOrDeterminedSqlQueries, userOrDeterminedLocker).log("R2dbcMigrateEnsuringInternals", Level.FINE, new SignalType[0]).then(acquireOrWaitForLock(connection, userOrDeterminedLocker, r2dbcMigrateProperties).log("R2dbcMigrateAcquiringLock", Level.FINE, new SignalType[0])).then(getDatabaseVersionOrZero(userOrDeterminedSqlQueries, connection, r2dbcMigrateProperties).log("R2dbcMigrateGetDatabaseVersion", Level.FINE, new SignalType[0])).flatMap(num -> {
            LOGGER.info("Database version is {}", num);
            return Flux.fromIterable(list).log("R2dbcMigrateRequestingMigrationFiles", Level.FINE, new SignalType[0]).filter(tuple2 -> {
                return ((FilenameParser.MigrationInfo) tuple2.getT2()).getVersion() > num.intValue();
            }).concatMap(tuple22 -> {
                return makeMigration(connection, r2dbcMigrateProperties, tuple22).log("R2dbcMigrateMakeMigrationWork", Level.FINE, new SignalType[0]).then(writeMigrationMetadata(connection, userOrDeterminedSqlQueries, tuple22).log("R2dbcMigrateWritingMigrationMetadata", Level.FINE, new SignalType[0]));
            }, 1).then(releaseLock(connection, userOrDeterminedLocker).log("R2dbcMigrateReleasingLock", Level.FINE, new SignalType[0]));
        });
    }

    private static SqlQueries getUserOrDeterminedSqlQueries(Dialect dialect, R2dbcMigrateProperties r2dbcMigrateProperties, SqlQueries sqlQueries) {
        SqlQueries sqlQueries2 = (SqlQueries) Objects.requireNonNullElseGet(sqlQueries, () -> {
            switch (dialect) {
                case POSTGRESQL:
                    return new PostgreSqlQueries(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsTable());
                case MSSQL:
                    return new MSSqlQueries(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsTable());
                case MYSQL:
                    return new MySqlQueries(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsTable());
                case H2:
                    return new H2Queries(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsTable());
                case MARIADB:
                    return new MariadbQueries(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsTable());
                default:
                    throw new IncompatibleClassChangeError();
            }
        });
        LOGGER.debug("Instantiated {}", sqlQueries2.getClass());
        return sqlQueries2;
    }

    private static Locker getUserOrDeterminedLocker(Dialect dialect, R2dbcMigrateProperties r2dbcMigrateProperties, Locker locker) {
        Locker locker2 = (Locker) Objects.requireNonNullElseGet(locker, () -> {
            switch (dialect) {
                case POSTGRESQL:
                    return r2dbcMigrateProperties.isPreferDbSpecificLock() ? new PostgreSqlAdvisoryLocker(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsLockTable()) : new PostgreSqlTableLocker(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsLockTable());
                case MSSQL:
                    return new MSSqlTableLocker(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsLockTable());
                case MYSQL:
                    return r2dbcMigrateProperties.isPreferDbSpecificLock() ? new MySqlSessionLocker(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsLockTable()) : new MySqlTableLocker(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsLockTable());
                case H2:
                    return new H2TableLocker(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsLockTable());
                case MARIADB:
                    return r2dbcMigrateProperties.isPreferDbSpecificLock() ? new MariadbSessionLocker(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsLockTable()) : new MariadbTableLocker(r2dbcMigrateProperties.getMigrationsSchema(), r2dbcMigrateProperties.getMigrationsLockTable());
                default:
                    throw new IncompatibleClassChangeError();
            }
        });
        LOGGER.debug("Instantiated {}", locker2.getClass());
        return locker2;
    }

    private static Mono<Void> makeMigration(Connection connection, R2dbcMigrateProperties r2dbcMigrateProperties, Tuple2<MigrateResource, FilenameParser.MigrationInfo> tuple2) {
        LOGGER.info("Applying {}", tuple2.getT2());
        return transactionalWrap(connection, tuple2.getT2().isTransactional(), getMigrateResultPublisher(r2dbcMigrateProperties, connection, tuple2.getT1(), tuple2.getT2()), tuple2.getT2().toString());
    }

    private static Mono<Void> writeMigrationMetadata(Connection connection, SqlQueries sqlQueries, Tuple2<MigrateResource, FilenameParser.MigrationInfo> tuple2) {
        return transactionalWrap(connection, true, sqlQueries.createInsertMigrationStatement(connection, tuple2.getT2()).execute(), "Writing metadata version " + tuple2.getT2().getVersion());
    }

    private static Mono<Integer> getDatabaseVersionOrZero(SqlQueries sqlQueries, Connection connection, R2dbcMigrateProperties r2dbcMigrateProperties) {
        return withAutoCommit(connection, connection.createStatement(sqlQueries.getMaxMigration()).execute()).flatMap(result -> {
            return Mono.from(result.map(getResultSafely("max", Integer.class, 0)));
        }).switchIfEmpty(Mono.just(0)).last();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <ColumnType> BiFunction<Row, RowMetadata, ColumnType> getResultSafely(String str, Class<ColumnType> cls, ColumnType columntype) {
        return (row, rowMetadata) -> {
            Object obj;
            if (rowMetadata.contains(str) && (obj = row.get(str, (Class<Object>) cls)) != null) {
                return obj;
            }
            return columntype;
        };
    }

    static Batch makeBatch(Connection connection, List<String> list) {
        Batch createBatch = connection.createBatch();
        Objects.requireNonNull(createBatch);
        list.forEach(createBatch::add);
        return createBatch;
    }

    private static Publisher<? extends Result> getMigrateResultPublisher(R2dbcMigrateProperties r2dbcMigrateProperties, Connection connection, MigrateResource migrateResource, FilenameParser.MigrationInfo migrationInfo) {
        return migrationInfo.isSplitByLine() ? FileReader.readChunked(migrateResource, r2dbcMigrateProperties.getFileCharset()).buffer(r2dbcMigrateProperties.getChunkSize()).concatMap(list -> {
            LOGGER.debug("Creating batch - for {} processing {} strings", migrationInfo, Integer.valueOf(list.size()));
            return makeBatch(connection, list).execute();
        }, 1) : connection.createStatement(FileReader.read(migrateResource, r2dbcMigrateProperties.getFileCharset())).execute();
    }
}
