package com.qwlabs.tq.services;

import com.google.common.base.Throwables;
import com.qwlabs.tq.models.CleanupTaskQueueCommand;
import com.qwlabs.tq.models.ProcessStatus;
import com.qwlabs.tq.models.TaskQueuePriorities;
import com.qwlabs.tq.models.TaskQueueRecord;
import com.qwlabs.tq.repositories.TaskQueueRecordRepository;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.narayana.jta.TransactionExceptionResult;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:com/qwlabs/tq/services/TaskQueue.class */
public class TaskQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskQueue.class);
    private final TaskQueueRecordRepository repository;

    @Inject
    public TaskQueue(TaskQueueRecordRepository taskQueueRecordRepository) {
        this.repository = taskQueueRecordRepository;
    }

    public <R extends TaskQueueRecord> void enqueue(R r) {
        QuarkusTransaction.joiningExisting().run(() -> {
            r.setPriority(TaskQueuePriorities.NEW);
            r.setProcessStatus(ProcessStatus.IDLE);
            r.setProcessStartAt(null);
            r.setProcessEndAt(null);
            this.repository.persist(r);
        });
    }

    @Retry(delay = 1, delayUnit = ChronoUnit.SECONDS)
    public void cleanup(CleanupTaskQueueCommand cleanupTaskQueueCommand) {
        cleanupTaskQueueCommand.getTopics().forEach(str -> {
            cleanupTaskQueueCommand.getStatuses().forEach(processStatus -> {
                QuarkusTransaction.requiringNew().run(() -> {
                    LOGGER.warn("Cleanup task queue count: {}", Integer.valueOf(this.repository.cleanup(str, cleanupTaskQueueCommand.getBucket(), processStatus)));
                });
            });
        });
    }

    public void onBefore(TaskQueueProcessContext taskQueueProcessContext) {
        QuarkusTransaction.requiringNew().exceptionHandler(th -> {
            return TransactionExceptionResult.ROLLBACK;
        }).run(() -> {
            this.repository.resetByTimeout(taskQueueProcessContext.getTopic(), taskQueueProcessContext.getBucket(), Instant.now().minus((TemporalAmount) taskQueueProcessContext.getTimeout()), TaskQueuePriorities.PROCESSING_TIMEOUT);
            this.repository.resetByStatus(taskQueueProcessContext.getTopic(), taskQueueProcessContext.getBucket(), ProcessStatus.FAILED, TaskQueuePriorities.POSTPONED_TO_IDLE);
            this.repository.resetByStatus(taskQueueProcessContext.getTopic(), taskQueueProcessContext.getBucket(), ProcessStatus.POSTPONED, TaskQueuePriorities.FAILED_TO_IDLE);
        });
    }

    public <R extends TaskQueueRecord> String poll(TaskQueueProcessContext taskQueueProcessContext) {
        return (String) QuarkusTransaction.requiringNew().call(() -> {
            Optional<U> map;
            do {
                Optional<String> peekId = this.repository.peekId(taskQueueProcessContext.getTopic(), taskQueueProcessContext.getBucket());
                TaskQueueRecordRepository taskQueueRecordRepository = this.repository;
                Objects.requireNonNull(taskQueueRecordRepository);
                map = peekId.map(taskQueueRecordRepository::lock);
            } while (!((Boolean) map.map(taskQueueRecord -> {
                return Boolean.valueOf(taskQueueRecord.getProcessStatus() == ProcessStatus.IDLE);
            }).orElse(true)).booleanValue());
            map.ifPresent(taskQueueRecord2 -> {
                taskQueueRecord2.setProcessStatus(ProcessStatus.PROCESSING);
                taskQueueRecord2.setProcessStartAt(Instant.now());
                taskQueueRecord2.setProcessEndAt(null);
                this.repository.persist(taskQueueRecord2);
            });
            return (String) map.map((v0) -> {
                return v0.getId();
            }).orElse(null);
        });
    }

    public <R extends TaskQueueRecord> void onWork(String str, Function<R, Boolean> function) {
        QuarkusTransaction.requiringNew().run(() -> {
            TaskQueueRecord find = this.repository.find(str);
            if (Objects.isNull(find)) {
                throw new RuntimeException("Can not onWork because record: %s is null.".formatted(str));
            }
            find.setProcessStatus(((Boolean) function.apply(find)).booleanValue() ? ProcessStatus.SUCCEED : ProcessStatus.POSTPONED);
            find.setProcessEndAt(Instant.now());
            this.repository.persist(find);
            this.repository.clear();
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <R extends TaskQueueRecord> void onWorkWithOutTx(String str, Function<R, Boolean> function) {
        TaskQueueRecord find = this.repository.find(str);
        if (Objects.isNull(find)) {
            throw new RuntimeException("Can not onWork because record: %s is null.".formatted(str));
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        try {
            atomicBoolean.set(((Boolean) function.apply(find)).booleanValue());
            QuarkusTransaction.requiringNew().run(() -> {
                TaskQueueRecord find2 = this.repository.find(str);
                find2.setProcessStatus(atomicBoolean.get() ? ProcessStatus.SUCCEED : ProcessStatus.POSTPONED);
                find2.setProcessEndAt(Instant.now());
                this.repository.persist(find2);
            });
        } catch (Throwable th) {
            QuarkusTransaction.requiringNew().run(() -> {
                TaskQueueRecord find2 = this.repository.find(str);
                find2.setProcessStatus(atomicBoolean.get() ? ProcessStatus.SUCCEED : ProcessStatus.POSTPONED);
                find2.setProcessEndAt(Instant.now());
                this.repository.persist(find2);
            });
            throw th;
        }
    }

    public <R extends TaskQueueRecord> void onFailed(TaskQueueProcessContext taskQueueProcessContext, String str, Exception exc) {
        onFailed(taskQueueProcessContext, str, exc, (taskQueueRecord, exc2) -> {
            LOGGER.error("Process task error. id={}", taskQueueRecord.getId(), exc2);
        });
    }

    public <R extends TaskQueueRecord> void onFailed(TaskQueueProcessContext taskQueueProcessContext, String str, Exception exc, BiConsumer<R, Exception> biConsumer) {
        QuarkusTransaction.requiringNew().run(() -> {
            TaskQueueRecord find = this.repository.find(str);
            if (Objects.isNull(find)) {
                LOGGER.error("Can not onFailed because record is null. can not found recordId {}.", str, exc);
                return;
            }
            find.setFailedMessage(Throwables.getStackTraceAsString(exc));
            find.setProcessStatus(ProcessStatus.FAILED);
            find.setProcessEndAt(Instant.now());
            this.repository.persist(find);
            taskQueueProcessContext.markFailedRecord(find.getId());
            biConsumer.accept(find, exc);
        });
    }
}
