package com.transferwise.tasks.processing;

import com.google.common.base.Preconditions;
import com.transferwise.common.baseutils.concurrency.LockUtils;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.common.context.Criticality;
import com.transferwise.common.context.TwContext;
import com.transferwise.common.context.TwContextClockHolder;
import com.transferwise.common.context.UnitOfWork;
import com.transferwise.common.context.UnitOfWorkManager;
import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy;
import com.transferwise.tasks.IPriorityManager;
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.buckets.BucketProperties;
import com.transferwise.tasks.buckets.IBucketsManager;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.domain.BaseTask;
import com.transferwise.tasks.domain.IBaseTask;
import com.transferwise.tasks.domain.ITask;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.entrypoints.EntryPointsGroups;
import com.transferwise.tasks.entrypoints.EntryPointsNames;
import com.transferwise.tasks.entrypoints.IEntryPointsService;
import com.transferwise.tasks.entrypoints.IMdcService;
import com.transferwise.tasks.handler.interfaces.IAsyncTaskProcessor;
import com.transferwise.tasks.handler.interfaces.ISyncTaskProcessor;
import com.transferwise.tasks.handler.interfaces.ITaskConcurrencyPolicy;
import com.transferwise.tasks.handler.interfaces.ITaskHandler;
import com.transferwise.tasks.handler.interfaces.ITaskHandlerRegistry;
import com.transferwise.tasks.handler.interfaces.ITaskProcessingPolicy;
import com.transferwise.tasks.handler.interfaces.ITaskProcessor;
import com.transferwise.tasks.handler.interfaces.ITaskRetryPolicy;
import com.transferwise.tasks.helpers.ICoreMetricsTemplate;
import com.transferwise.tasks.helpers.executors.IExecutorsHelper;
import com.transferwise.tasks.processing.GlobalProcessingState;
import com.transferwise.tasks.processing.ITasksProcessingService;
import com.transferwise.tasks.triggering.TaskTriggering;
import com.transferwise.tasks.utils.LogUtils;
import com.transferwise.tasks.utils.WaitUtils;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionSynchronizationManager;

/* loaded from: input_file:com/transferwise/tasks/processing/TasksProcessingService.class */
public class TasksProcessingService implements GracefulShutdownStrategy, ITasksProcessingService, InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(TasksProcessingService.class);

    @Autowired
    private ITaskDao taskDao;

    @Autowired
    private ITaskHandlerRegistry taskHandlerRegistry;

    @Autowired
    private TasksProperties tasksProperties;

    @Autowired
    private IExecutorsHelper executorsHelper;

    @Autowired
    private GlobalProcessingState globalProcessingState;

    @Autowired
    private IPriorityManager priorityManager;

    @Autowired
    private IBucketsManager bucketsManager;

    @Autowired
    private ITransactionsHelper transactionsHelper;

    @Autowired
    private UnitOfWorkManager unitOfWorkManager;

    @Autowired
    private IMdcService mdcService;

    @Autowired
    private IEntryPointsService entryPointsHelper;

    @Autowired
    private ICoreMetricsTemplate coreMetricsTemplate;
    private ExecutorService taskExecutor;
    private ExecutorService tasksProcessingExecutor;
    private ExecutorService tasksGrabbingExecutor;
    private volatile boolean shuttingDown;
    private Consumer<TaskTriggering> taskTriggeringProcessingListener;
    private Instant shutdownStartTime;

    @GuardedBy("lifecycleLock")
    private boolean processingStarted;

    @Autowired(required = false)
    private final List<ITaskProcessingInterceptor> taskProcessingInterceptors = new ArrayList();
    private final AtomicInteger runningTasksCount = new AtomicInteger();
    private final AtomicInteger ongoingTasksGrabbingsCount = new AtomicInteger();
    private final AtomicLong taskTriggeringSequence = new AtomicLong(0);
    private final Set<Thread> tasksProcessingThreads = new HashSet();
    private final Lock tasksProcessingThreadsLock = new ReentrantLock();
    private final Lock lifecycleLock = new ReentrantLock();

    /* loaded from: input_file:com/transferwise/tasks/processing/TasksProcessingService$ProcessTaskResponse.class */
    public static class ProcessTaskResponse {
        private Result result;
        private Code code;
        private Instant tryAgainTime;

        /* loaded from: input_file:com/transferwise/tasks/processing/TasksProcessingService$ProcessTaskResponse$Code.class */
        public enum Code {
            NO_HANDLER,
            NO_POLICY,
            NO_CONCURRENCY_POLICY,
            UNKNOWN_ERROR,
            HAPPY_FLOW,
            NOT_ALLOWED_ON_NODE
        }

        /* loaded from: input_file:com/transferwise/tasks/processing/TasksProcessingService$ProcessTaskResponse$Result.class */
        public enum Result {
            OK,
            NO_SPACE,
            ERROR
        }

        public Result getResult() {
            return this.result;
        }

        public Code getCode() {
            return this.code;
        }

        public Instant getTryAgainTime() {
            return this.tryAgainTime;
        }

        public ProcessTaskResponse setResult(Result result) {
            this.result = result;
            return this;
        }

        public ProcessTaskResponse setCode(Code code) {
            this.code = code;
            return this;
        }

        public ProcessTaskResponse setTryAgainTime(Instant instant) {
            this.tryAgainTime = instant;
            return this;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ProcessTaskResponse)) {
                return false;
            }
            ProcessTaskResponse processTaskResponse = (ProcessTaskResponse) obj;
            if (!processTaskResponse.canEqual(this)) {
                return false;
            }
            Result result = getResult();
            Result result2 = processTaskResponse.getResult();
            if (result == null) {
                if (result2 != null) {
                    return false;
                }
            } else if (!result.equals(result2)) {
                return false;
            }
            Code code = getCode();
            Code code2 = processTaskResponse.getCode();
            if (code == null) {
                if (code2 != null) {
                    return false;
                }
            } else if (!code.equals(code2)) {
                return false;
            }
            Instant tryAgainTime = getTryAgainTime();
            Instant tryAgainTime2 = processTaskResponse.getTryAgainTime();
            return tryAgainTime == null ? tryAgainTime2 == null : tryAgainTime.equals(tryAgainTime2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ProcessTaskResponse;
        }

        public int hashCode() {
            Result result = getResult();
            int hashCode = (1 * 59) + (result == null ? 43 : result.hashCode());
            Code code = getCode();
            int hashCode2 = (hashCode * 59) + (code == null ? 43 : code.hashCode());
            Instant tryAgainTime = getTryAgainTime();
            return (hashCode2 * 59) + (tryAgainTime == null ? 43 : tryAgainTime.hashCode());
        }

        public String toString() {
            return "TasksProcessingService.ProcessTaskResponse(result=" + getResult() + ", code=" + getCode() + ", tryAgainTime=" + getTryAgainTime() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/transferwise/tasks/processing/TasksProcessingService$ProcessTasksResponse.class */
    public static class ProcessTasksResponse {
        private Instant tryAgainTime;

        public Instant getTryAgainTime() {
            return this.tryAgainTime;
        }

        public ProcessTasksResponse setTryAgainTime(Instant instant) {
            this.tryAgainTime = instant;
            return this;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ProcessTasksResponse)) {
                return false;
            }
            ProcessTasksResponse processTasksResponse = (ProcessTasksResponse) obj;
            if (!processTasksResponse.canEqual(this)) {
                return false;
            }
            Instant tryAgainTime = getTryAgainTime();
            Instant tryAgainTime2 = processTasksResponse.getTryAgainTime();
            return tryAgainTime == null ? tryAgainTime2 == null : tryAgainTime.equals(tryAgainTime2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ProcessTasksResponse;
        }

        public int hashCode() {
            Instant tryAgainTime = getTryAgainTime();
            return (1 * 59) + (tryAgainTime == null ? 43 : tryAgainTime.hashCode());
        }

        public String toString() {
            return "TasksProcessingService.ProcessTasksResponse(tryAgainTime=" + getTryAgainTime() + ")";
        }
    }

    /* loaded from: input_file:com/transferwise/tasks/processing/TasksProcessingService$ProcessingResult.class */
    private enum ProcessingResult {
        SUCCESS,
        COMMIT_AND_RETRY,
        ERROR
    }

    /* loaded from: input_file:com/transferwise/tasks/processing/TasksProcessingService$SyncProcessingRolledbackException.class */
    protected static class SyncProcessingRolledbackException extends RuntimeException {
        static final long serialVersionUID = 1;
        private final ISyncTaskProcessor.ProcessResult processResult;

        public SyncProcessingRolledbackException(ISyncTaskProcessor.ProcessResult processResult) {
            super(null, null, false, false);
            this.processResult = processResult;
        }

        public ISyncTaskProcessor.ProcessResult getProcessResult() {
            return this.processResult;
        }
    }

    public void afterPropertiesSet() {
        this.taskExecutor = this.executorsHelper.newCachedExecutor("taskExecutor");
        this.tasksProcessingExecutor = this.executorsHelper.newCachedExecutor("tasksProcessing");
        this.tasksGrabbingExecutor = this.executorsHelper.newCachedExecutor("tasksGrabbing");
        this.coreMetricsTemplate.registerOngoingTasksGrabbingsCount(this.ongoingTasksGrabbingsCount);
    }

    @Override // com.transferwise.tasks.processing.ITasksProcessingService
    public ITasksProcessingService.AddTaskForProcessingResponse addTaskForProcessing(TaskTriggering taskTriggering) {
        if (this.tasksProperties.isAssertionsEnabled()) {
            Preconditions.checkState(!TransactionSynchronizationManager.isActualTransactionActive());
        }
        String bucketId = taskTriggering.getBucketId();
        BaseTask task = taskTriggering.getTask();
        if (task.getType() == null) {
            throw new IllegalStateException("Task " + LogUtils.asParameter(task.getVersionId()) + "' type is null.");
        }
        BucketProperties bucketProperties = this.bucketsManager.getBucketProperties(bucketId);
        GlobalProcessingState.Bucket bucket = this.globalProcessingState.getBuckets().get(bucketId);
        if (bucket.getSize().get() >= bucketProperties.getMaxTriggersInMemory().intValue()) {
            return new ITasksProcessingService.AddTaskForProcessingResponse().setResult(ITasksProcessingService.AddTaskForProcessingResponse.ResultCode.FULL);
        }
        bucket.getPrioritySlot(Integer.valueOf(this.priorityManager.normalize(Integer.valueOf(task.getPriority())))).getTaskTriggerings().add(taskTriggering);
        bucket.getSize().incrementAndGet();
        bucket.increaseVersion();
        return new ITasksProcessingService.AddTaskForProcessingResponse().setResult(ITasksProcessingService.AddTaskForProcessingResponse.ResultCode.OK);
    }

    @Override // com.transferwise.tasks.processing.ITasksProcessingService
    public void addTaskTriggeringFinishedListener(Consumer<TaskTriggering> consumer) {
        this.taskTriggeringProcessingListener = consumer;
    }

    protected ProcessTasksResponse processTasks(GlobalProcessingState.Bucket bucket) {
        String bucketId = bucket.getBucketId();
        MutableObject mutableObject = new MutableObject();
        MutableBoolean mutableBoolean = new MutableBoolean();
        HashMap hashMap = new HashMap();
        for (Integer num : bucket.getPriorities()) {
            if (mutableBoolean.isTrue()) {
                break;
            }
            if (this.tasksProperties.isDebugMetricsEnabled()) {
                this.coreMetricsTemplate.debugPriorityQueueCheck(bucketId, num.intValue());
            }
            GlobalProcessingState.PrioritySlot prioritySlot = bucket.getPrioritySlot(num);
            transferFromIntermediateBuffer(bucket, prioritySlot);
            Iterator<GlobalProcessingState.TypeTasks> it = prioritySlot.getOrderedTypeTasks().iterator();
            while (true) {
                if (it.hasNext()) {
                    GlobalProcessingState.TypeTasks next = it.next();
                    String type = next.getType();
                    if (!hashMap.containsKey(type)) {
                        TaskTriggering peek = next.peek();
                        if (peek != null) {
                            BaseTask task = peek.getTask();
                            this.mdcService.with(() -> {
                                this.mdcService.put(task);
                                try {
                                    ProcessTaskResponse grabTaskForProcessing = grabTaskForProcessing(bucketId, task);
                                    this.coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, num.intValue(), grabTaskForProcessing);
                                    if (grabTaskForProcessing.getResult() == ProcessTaskResponse.Result.NO_SPACE) {
                                        hashMap.put(type, Boolean.TRUE);
                                        if (grabTaskForProcessing.getTryAgainTime() != null && (mutableObject.getValue() == null || ((Instant) mutableObject.getValue()).isAfter(grabTaskForProcessing.getTryAgainTime()))) {
                                            mutableObject.setValue(grabTaskForProcessing.getTryAgainTime());
                                        }
                                    } else {
                                        mutableBoolean.setTrue();
                                    }
                                } catch (Throwable th) {
                                    log.error("Scheduling of task '" + task.getVersionId() + "' failed.", th);
                                    mutableBoolean.setTrue();
                                }
                                if (mutableBoolean.isTrue()) {
                                    this.taskTriggeringProcessingListener.accept(peek);
                                    prioritySlot.getOrderedTypeTasks().remove(next);
                                    next.removeLast();
                                    log.debug("Removed task '{}' triggering from processingState.", task.getVersionId());
                                    prioritySlot.getOrderedTypeTasks().add(next);
                                    bucket.increaseVersion();
                                }
                            });
                            if (mutableBoolean.isTrue()) {
                                break;
                            }
                        } else if (this.tasksProperties.isDebugMetricsEnabled()) {
                            this.coreMetricsTemplate.debugTaskTriggeringQueueEmpty(bucketId, num.intValue(), type);
                        }
                    } else if (this.tasksProperties.isDebugMetricsEnabled()) {
                        this.coreMetricsTemplate.debugRoomMapAlreadyHasType(bucketId, num.intValue(), type);
                    }
                }
            }
        }
        ProcessTasksResponse processTasksResponse = new ProcessTasksResponse();
        if (!mutableBoolean.isTrue()) {
            processTasksResponse.setTryAgainTime((Instant) mutableObject.getValue());
        }
        return processTasksResponse;
    }

    protected void transferFromIntermediateBuffer(GlobalProcessingState.Bucket bucket, GlobalProcessingState.PrioritySlot prioritySlot) {
        while (true) {
            TaskTriggering poll = prioritySlot.getTaskTriggerings().poll();
            if (poll == null) {
                return;
            }
            poll.setSequence(this.taskTriggeringSequence.incrementAndGet());
            String type = poll.getTask().getType();
            GlobalProcessingState.TypeTasks typeTasks = prioritySlot.getTypeTasks().get(type);
            if (typeTasks == null) {
                Map<String, GlobalProcessingState.TypeTasks> typeTasks2 = prioritySlot.getTypeTasks();
                GlobalProcessingState.TypeTasks type2 = new GlobalProcessingState.TypeTasks().setType(type);
                typeTasks = type2;
                typeTasks2.put(type, type2);
                this.coreMetricsTemplate.registerProcessingTriggersCount(bucket.getBucketId(), type, () -> {
                    return Integer.valueOf(typeTasks.getSize().get());
                });
                prioritySlot.getOrderedTypeTasks().add(typeTasks);
            }
            boolean z = typeTasks.peek() == null;
            if (z) {
                prioritySlot.getOrderedTypeTasks().remove(typeTasks);
            }
            typeTasks.add(poll);
            if (z) {
                prioritySlot.getOrderedTypeTasks().add(typeTasks);
            }
            bucket.increaseVersion();
        }
    }

    protected void markAsError(IBaseTask iBaseTask, String str) {
        if (this.taskDao.setStatus(iBaseTask.getVersionId().getId(), TaskStatus.ERROR, iBaseTask.getVersionId().getVersion())) {
            this.coreMetricsTemplate.registerTaskMarkedAsError(str, iBaseTask.getType());
        } else {
            this.coreMetricsTemplate.registerFailedStatusChange(iBaseTask.getType(), TaskStatus.UNKNOWN.name(), TaskStatus.ERROR);
        }
    }

    /* JADX WARN: Finally extract failed */
    protected ProcessTaskResponse grabTaskForProcessing(String str, BaseTask baseTask) {
        GlobalProcessingState.Bucket bucket = this.globalProcessingState.getBuckets().get(str);
        BucketProperties bucketProperties = this.bucketsManager.getBucketProperties(str);
        ITaskHandler taskHandler = this.taskHandlerRegistry.getTaskHandler(baseTask);
        if (taskHandler == null) {
            log.error("Marking task {} as ERROR, because no task handler was found for type '{}'.", LogUtils.asParameter(baseTask.getVersionId()), baseTask.getType());
            markAsError(baseTask, str);
            return new ProcessTaskResponse().setResult(ProcessTaskResponse.Result.ERROR).setCode(ProcessTaskResponse.Code.NO_HANDLER);
        }
        ITaskProcessingPolicy processingPolicy = taskHandler.getProcessingPolicy(baseTask);
        if (processingPolicy == null) {
            log.error("Marking task {} as ERROR, as the handler does not provide a processing policy.", LogUtils.asParameter(baseTask.getVersionId()));
            markAsError(baseTask, str);
            return new ProcessTaskResponse().setResult(ProcessTaskResponse.Result.ERROR).setCode(ProcessTaskResponse.Code.NO_POLICY);
        }
        if (!processingPolicy.canExecuteTaskOnThisNode(baseTask)) {
            return new ProcessTaskResponse().setResult(ProcessTaskResponse.Result.OK).setCode(ProcessTaskResponse.Code.NOT_ALLOWED_ON_NODE);
        }
        ITaskConcurrencyPolicy concurrencyPolicy = taskHandler.getConcurrencyPolicy(baseTask);
        if (concurrencyPolicy == null) {
            log.error("Marking task {} as ERROR, as the handler does not provide a concurrency policy.", LogUtils.asParameter(baseTask.getVersionId()));
            markAsError(baseTask, str);
            return new ProcessTaskResponse().setResult(ProcessTaskResponse.Result.ERROR).setCode(ProcessTaskResponse.Code.NO_CONCURRENCY_POLICY);
        }
        ITaskConcurrencyPolicy.BookSpaceResponse bookSpace = concurrencyPolicy.bookSpace(baseTask);
        if (!bookSpace.isHasRoom()) {
            log.debug("There is no space to process task '{}'.", baseTask.getVersionId());
            return new ProcessTaskResponse().setResult(ProcessTaskResponse.Result.NO_SPACE).setTryAgainTime(bookSpace.getTryAgainTime());
        }
        try {
            bucket.getTasksGrabbingLock().lock();
            while (bucket.getInProgressTasksGrabbingCount().incrementAndGet() > bucketProperties.getTaskGrabbingMaxConcurrency().intValue()) {
                try {
                    bucket.getInProgressTasksGrabbingCount().decrementAndGet();
                    bucket.getTasksGrabbingCondition().await(this.tasksProperties.getGenericMediumDelay().toMillis(), TimeUnit.MILLISECONDS);
                } catch (Throwable th) {
                    bucket.getTasksGrabbingLock().unlock();
                    throw th;
                }
            }
            bucket.getTasksGrabbingLock().unlock();
            this.ongoingTasksGrabbingsCount.incrementAndGet();
            this.tasksGrabbingExecutor.submit(() -> {
                try {
                    grabTaskForProcessing0(bucket, baseTask, concurrencyPolicy, taskHandler);
                } catch (Throwable th2) {
                    log.error("Task grabbing failed for '{}'.", baseTask.getVersionId(), th2);
                }
            });
            return new ProcessTaskResponse().setResult(ProcessTaskResponse.Result.OK).setCode(ProcessTaskResponse.Code.HAPPY_FLOW);
        } catch (Throwable th2) {
            log.error(th2.getMessage(), th2);
            concurrencyPolicy.freeSpace(baseTask);
            this.globalProcessingState.increaseBucketsVersion();
            if (!this.taskDao.setStatus(baseTask.getId(), TaskStatus.ERROR, baseTask.getVersion())) {
                this.coreMetricsTemplate.registerFailedStatusChange(baseTask.getType(), TaskStatus.UNKNOWN.name(), TaskStatus.ERROR);
            }
            return new ProcessTaskResponse().setResult(ProcessTaskResponse.Result.ERROR).setCode(ProcessTaskResponse.Code.UNKNOWN_ERROR);
        }
    }

    protected void grabTaskForProcessing0(GlobalProcessingState.Bucket bucket, BaseTask baseTask, ITaskConcurrencyPolicy iTaskConcurrencyPolicy, ITaskHandler iTaskHandler) {
        this.unitOfWorkManager.createEntryPoint(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.GRAB_TASK).toContext().execute(() -> {
            Lock tasksGrabbingLock;
            try {
                try {
                    Instant processingDeadline = iTaskHandler.getProcessingPolicy(baseTask).getProcessingDeadline(baseTask);
                    boolean z = false;
                    try {
                        boolean z2 = true;
                        if (this.tasksProperties.isCheckVersionBeforeGrabbing()) {
                            Long taskVersion = this.taskDao.getTaskVersion(baseTask.getId());
                            z2 = taskVersion != null && taskVersion.longValue() == baseTask.getVersion();
                        }
                        Task grabForProcessing = z2 ? this.taskDao.grabForProcessing(baseTask, this.tasksProperties.getClientId(), processingDeadline) : null;
                        if (grabForProcessing == null) {
                            log.debug("Task '{}' was not available for processing with its version.", baseTask.getVersionId());
                        } else {
                            scheduleTask(bucket.getBucketId(), iTaskHandler, iTaskConcurrencyPolicy, grabForProcessing);
                            z = true;
                        }
                        if (!z) {
                            iTaskConcurrencyPolicy.freeSpace(baseTask);
                            this.globalProcessingState.increaseBucketsVersion();
                            this.coreMetricsTemplate.registerFailedTaskGrabbing(bucket.getBucketId(), baseTask.getType());
                        }
                        bucket.getSize().decrementAndGet();
                        bucket.increaseVersion();
                        tasksGrabbingLock = bucket.getTasksGrabbingLock();
                        tasksGrabbingLock.lock();
                        try {
                            bucket.getInProgressTasksGrabbingCount().decrementAndGet();
                            bucket.getTasksGrabbingCondition().signalAll();
                            tasksGrabbingLock.unlock();
                            this.ongoingTasksGrabbingsCount.decrementAndGet();
                        } finally {
                        }
                    } catch (Throwable th) {
                        if (0 == 0) {
                            iTaskConcurrencyPolicy.freeSpace(baseTask);
                            this.globalProcessingState.increaseBucketsVersion();
                            this.coreMetricsTemplate.registerFailedTaskGrabbing(bucket.getBucketId(), baseTask.getType());
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    log.error("Grabbing task '" + baseTask.getVersionId() + "' failed.", th2);
                    bucket.getSize().decrementAndGet();
                    bucket.increaseVersion();
                    tasksGrabbingLock = bucket.getTasksGrabbingLock();
                    tasksGrabbingLock.lock();
                    try {
                        bucket.getInProgressTasksGrabbingCount().decrementAndGet();
                        bucket.getTasksGrabbingCondition().signalAll();
                        tasksGrabbingLock.unlock();
                        this.ongoingTasksGrabbingsCount.decrementAndGet();
                    } finally {
                        tasksGrabbingLock.unlock();
                    }
                }
            } catch (Throwable th3) {
                bucket.getSize().decrementAndGet();
                bucket.increaseVersion();
                tasksGrabbingLock = bucket.getTasksGrabbingLock();
                tasksGrabbingLock.lock();
                try {
                    bucket.getInProgressTasksGrabbingCount().decrementAndGet();
                    bucket.getTasksGrabbingCondition().signalAll();
                    tasksGrabbingLock.unlock();
                    this.ongoingTasksGrabbingsCount.decrementAndGet();
                    throw th3;
                } finally {
                }
            }
        });
    }

    protected void scheduleTask(String str, ITaskHandler iTaskHandler, ITaskConcurrencyPolicy iTaskConcurrencyPolicy, Task task) {
        this.taskExecutor.submit(() -> {
            try {
                processTask(iTaskHandler, iTaskConcurrencyPolicy, str, task);
            } catch (Throwable th) {
                log.error("Task processing failed for '{}'.", task.getVersionId(), th);
            }
        });
    }

    protected void processWithInterceptors(int i, Task task, Runnable runnable) {
        if (i >= this.taskProcessingInterceptors.size()) {
            runnable.run();
        } else {
            this.taskProcessingInterceptors.get(i).doProcess(task, () -> {
                processWithInterceptors(i + 1, task, runnable);
            });
        }
    }

    protected void processTask(ITaskHandler iTaskHandler, ITaskConcurrencyPolicy iTaskConcurrencyPolicy, String str, Task task) {
        this.mdcService.clear();
        try {
            try {
                this.unitOfWorkManager.createEntryPoint(EntryPointsGroups.TW_TASKS, "Processing_" + task.getType()).toContext().execute(() -> {
                    this.runningTasksCount.incrementAndGet();
                    this.globalProcessingState.getBuckets().get(str).getRunningTasksCount().incrementAndGet();
                    this.mdcService.put((ITask) task);
                    UUID id = task.getId();
                    ITaskProcessingPolicy processingPolicy = iTaskHandler.getProcessingPolicy(task);
                    Instant processingDeadline = processingPolicy.getProcessingDeadline(task);
                    Criticality processingCriticality = processingPolicy.getProcessingCriticality(task);
                    String owner = processingPolicy.getOwner(task);
                    TwContext current = TwContext.current();
                    UnitOfWork unitOfWork = this.unitOfWorkManager.getUnitOfWork(current);
                    unitOfWork.setCriticality(processingCriticality);
                    if (owner != null) {
                        current.setOwner(owner);
                    }
                    long millis = TwContextClockHolder.getClock().millis();
                    MutableObject mutableObject = new MutableObject(ProcessingResult.SUCCESS);
                    ITaskProcessor processor = iTaskHandler.getProcessor(task.toBaseTask());
                    processWithInterceptors(0, task, () -> {
                        log.debug("Processing task '{}' using {}.", id, processor);
                        try {
                            if (!(processor instanceof ISyncTaskProcessor)) {
                                if (!(processor instanceof IAsyncTaskProcessor)) {
                                    log.error("Marking task {} as ERROR, because No suitable processor found for task " + LogUtils.asParameter(task.getVersionId()) + ".");
                                    markAsError(task, str);
                                    return;
                                }
                                AtomicBoolean atomicBoolean = new AtomicBoolean();
                                try {
                                    this.coreMetricsTemplate.registerTaskProcessingStart(str, task.getType());
                                    unitOfWork.setDeadline(processingDeadline);
                                    try {
                                        ((IAsyncTaskProcessor) processor).process(task, () -> {
                                            unitOfWork.setDeadline((Instant) null);
                                            markTaskAsDoneFromAsync(task, atomicBoolean.compareAndSet(false, true), str, iTaskConcurrencyPolicy, millis);
                                        }, th -> {
                                            unitOfWork.setDeadline((Instant) null);
                                            setRetriesOrErrorFromAsync(task, atomicBoolean.compareAndSet(false, true), str, iTaskConcurrencyPolicy, millis, iTaskHandler, th);
                                        });
                                        unitOfWork.setDeadline((Instant) null);
                                        return;
                                    } catch (Throwable th2) {
                                        unitOfWork.setDeadline((Instant) null);
                                        throw th2;
                                    }
                                } catch (Throwable th3) {
                                    log.error("Processing task '" + task.getVersionId() + "' failed.", th3);
                                    if (atomicBoolean.compareAndSet(false, true)) {
                                        taskFinished(str, iTaskConcurrencyPolicy, task, millis, ProcessingResult.ERROR);
                                    }
                                    setRetriesOrError(str, iTaskHandler, task, th3);
                                    return;
                                }
                            }
                            try {
                                try {
                                    ISyncTaskProcessor iSyncTaskProcessor = (ISyncTaskProcessor) processor;
                                    MutableObject mutableObject2 = new MutableObject();
                                    boolean isTransactional = iSyncTaskProcessor.isTransactional(task);
                                    Runnable runnable = () -> {
                                        this.coreMetricsTemplate.registerTaskProcessingStart(str, task.getType());
                                        LockUtils.withLock(this.tasksProcessingThreadsLock, () -> {
                                            return Boolean.valueOf(this.tasksProcessingThreads.add(Thread.currentThread()));
                                        });
                                        try {
                                            unitOfWork.setDeadline(processingDeadline);
                                            mutableObject2.setValue(iSyncTaskProcessor.process(task));
                                            unitOfWork.setDeadline((Instant) null);
                                            LockUtils.withLock(this.tasksProcessingThreadsLock, () -> {
                                                return Boolean.valueOf(this.tasksProcessingThreads.remove(Thread.currentThread()));
                                            });
                                        } catch (Throwable th4) {
                                            unitOfWork.setDeadline((Instant) null);
                                            LockUtils.withLock(this.tasksProcessingThreadsLock, () -> {
                                                return Boolean.valueOf(this.tasksProcessingThreads.remove(Thread.currentThread()));
                                            });
                                            throw th4;
                                        }
                                    };
                                    if (!isTransactional) {
                                        runnable.run();
                                    }
                                    this.transactionsHelper.withTransaction().asNew().call(() -> {
                                        if (isTransactional) {
                                            runnable.run();
                                        }
                                        ISyncTaskProcessor.ProcessResult processResult = (ISyncTaskProcessor.ProcessResult) mutableObject2.getValue();
                                        if (this.transactionsHelper.isRollbackOnly()) {
                                            log.debug("Task processor for task '{}' has crappy code. Fixing it with a rollback exception.", task.getVersionId());
                                            throw new SyncProcessingRolledbackException(processResult);
                                        }
                                        if (processResult == null || processResult.getResultCode() == null || processResult.getResultCode() == ISyncTaskProcessor.ProcessResult.ResultCode.DONE) {
                                            markTaskAsDone(task);
                                            return null;
                                        }
                                        if (processResult.getResultCode() == ISyncTaskProcessor.ProcessResult.ResultCode.COMMIT_AND_RETRY) {
                                            mutableObject.setValue(ProcessingResult.COMMIT_AND_RETRY);
                                            setRepeatOnSuccess(str, iTaskHandler, task);
                                            return null;
                                        }
                                        if (processResult.getResultCode() == ISyncTaskProcessor.ProcessResult.ResultCode.DONE_AND_DELETE) {
                                            deleteTask(task);
                                            return null;
                                        }
                                        mutableObject.setValue(ProcessingResult.ERROR);
                                        setRetriesOrError(str, iTaskHandler, task, null);
                                        return null;
                                    });
                                    taskFinished(str, iTaskConcurrencyPolicy, task, millis, (ProcessingResult) mutableObject.getValue());
                                } catch (SyncProcessingRolledbackException e) {
                                    try {
                                        this.transactionsHelper.withTransaction().asNew().call(() -> {
                                            ISyncTaskProcessor.ProcessResult processResult = e.getProcessResult();
                                            if (processResult == null || processResult.getResultCode() == null || processResult.getResultCode() == ISyncTaskProcessor.ProcessResult.ResultCode.DONE) {
                                                markTaskAsDone(task);
                                                return null;
                                            }
                                            if (processResult.getResultCode() == ISyncTaskProcessor.ProcessResult.ResultCode.DONE_AND_DELETE) {
                                                deleteTask(task);
                                                return null;
                                            }
                                            setRetriesOrError(str, iTaskHandler, task, null);
                                            return null;
                                        });
                                    } catch (Throwable th4) {
                                        log.error("Processing task {} type: '{}' subType: '{}' failed.", new Object[]{LogUtils.asParameter(task.getVersionId()), task.getType(), task.getSubType(), th4});
                                        mutableObject.setValue(ProcessingResult.ERROR);
                                        setRetriesOrError(str, iTaskHandler, task, th4);
                                    }
                                    taskFinished(str, iTaskConcurrencyPolicy, task, millis, (ProcessingResult) mutableObject.getValue());
                                }
                            } catch (Throwable th5) {
                                if (Thread.interrupted()) {
                                    log.debug("Task {} got interrupted.", LogUtils.asParameter(task.getVersionId()));
                                }
                                log.error("Processing task {} type: '{}' subType: '{}' failed.", new Object[]{LogUtils.asParameter(task.getVersionId()), task.getType(), task.getSubType(), th5});
                                mutableObject.setValue(ProcessingResult.ERROR);
                                setRetriesOrError(str, iTaskHandler, task, th5);
                                taskFinished(str, iTaskConcurrencyPolicy, task, millis, (ProcessingResult) mutableObject.getValue());
                            }
                        } catch (Throwable th6) {
                            taskFinished(str, iTaskConcurrencyPolicy, task, millis, (ProcessingResult) mutableObject.getValue());
                            throw th6;
                        }
                    });
                });
                this.mdcService.clear();
            } catch (Throwable th) {
                log.error("Processing task '" + task.getVersionId() + "' failed.", th);
                this.mdcService.clear();
            }
        } catch (Throwable th2) {
            this.mdcService.clear();
            throw th2;
        }
    }

    protected void markTaskAsDoneFromAsync(Task task, boolean z, String str, ITaskConcurrencyPolicy iTaskConcurrencyPolicy, long j) {
        this.entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.ASYNC_HANDLE_SUCCESS, () -> {
            this.mdcService.put((ITask) task);
            if (z) {
                taskFinished(str, iTaskConcurrencyPolicy, task, j, ProcessingResult.SUCCESS);
            }
            markTaskAsDone(task);
            return null;
        });
    }

    protected void setRetriesOrErrorFromAsync(Task task, boolean z, String str, ITaskConcurrencyPolicy iTaskConcurrencyPolicy, long j, ITaskHandler iTaskHandler, Throwable th) {
        this.entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.ASYNC_HANDLE_FAIL, () -> {
            this.mdcService.put((ITask) task);
            if (z) {
                taskFinished(str, iTaskConcurrencyPolicy, task, j, ProcessingResult.ERROR);
            }
            setRetriesOrError(str, iTaskHandler, task, th);
            return null;
        });
    }

    protected void markTaskAsDone(Task task) {
        UUID id = task.getId();
        log.debug("Task '{}' finished successfully.", id);
        if (this.tasksProperties.isDeleteTaskOnFinish()) {
            this.taskDao.deleteTask(id, task.getVersion());
            return;
        }
        if (this.tasksProperties.isClearPayloadOnFinish()) {
            if (this.taskDao.clearPayloadAndMarkDone(id, task.getVersion())) {
                return;
            }
            this.coreMetricsTemplate.registerFailedStatusChange(task.getType(), task.getStatus(), TaskStatus.DONE);
        } else {
            if (this.taskDao.setStatus(id, TaskStatus.DONE, task.getVersion())) {
                return;
            }
            this.coreMetricsTemplate.registerFailedStatusChange(task.getType(), task.getStatus(), TaskStatus.DONE);
        }
    }

    private void deleteTask(Task task) {
        UUID id = task.getId();
        log.debug("Task '{}' finished successfully, deleting.", id);
        this.taskDao.deleteTask(id, task.getVersion());
    }

    private void setRepeatOnSuccess(String str, ITaskHandler iTaskHandler, Task task) {
        ITaskRetryPolicy retryPolicy = iTaskHandler.getRetryPolicy(task.toBaseTask());
        if (retryPolicy.resetTriesCountOnSuccess(task)) {
            task.setProcessingTriesCount(0L);
        }
        ZonedDateTime retryTime = retryPolicy.getRetryTime(task, null);
        if (retryTime == null) {
            setRetriesOrError(str, task, null);
            return;
        }
        log.debug("Repeating task '{}' will be reprocessed @ {}. Tries count reset: {}.", new Object[]{task.getVersionId(), retryTime, Boolean.valueOf(retryPolicy.resetTriesCountOnSuccess(task))});
        this.coreMetricsTemplate.registerTaskRetry(str, task.getType());
        setToBeRetried(task, retryTime, retryPolicy.resetTriesCountOnSuccess(task));
    }

    protected void setRetriesOrError(String str, ITaskHandler iTaskHandler, Task task, Throwable th) {
        setRetriesOrError(str, task, iTaskHandler.getRetryPolicy(task.toBaseTask()).getRetryTime(task, th));
    }

    protected void setRetriesOrError(String str, Task task, ZonedDateTime zonedDateTime) {
        if (zonedDateTime != null) {
            log.info("Task {} will be reprocessed @ " + zonedDateTime + ".", LogUtils.asParameter(task.getVersionId()));
            this.coreMetricsTemplate.registerTaskRetryOnError(str, task.getType());
            setToBeRetried(task, zonedDateTime, false);
        } else {
            log.info("Task {} marked as ERROR.", LogUtils.asParameter(task.getVersionId()));
            if (this.taskDao.setStatus(task.getId(), TaskStatus.ERROR, task.getVersion())) {
                this.coreMetricsTemplate.registerTaskMarkedAsError(str, task.getType());
            } else {
                this.coreMetricsTemplate.registerFailedStatusChange(task.getType(), task.getStatus(), TaskStatus.ERROR);
            }
        }
    }

    private void setToBeRetried(Task task, ZonedDateTime zonedDateTime, boolean z) {
        if (this.taskDao.setToBeRetried(task.getId(), zonedDateTime, task.getVersion(), z)) {
            return;
        }
        this.coreMetricsTemplate.registerFailedStatusChange(task.getType(), task.getStatus(), TaskStatus.WAITING);
    }

    private void taskFinished(String str, ITaskConcurrencyPolicy iTaskConcurrencyPolicy, Task task, long j, ProcessingResult processingResult) {
        this.runningTasksCount.decrementAndGet();
        this.globalProcessingState.getBuckets().get(str).getRunningTasksCount().decrementAndGet();
        this.coreMetricsTemplate.registerTaskProcessingEnd(str, task.getType(), j, processingResult.name());
        iTaskConcurrencyPolicy.freeSpace(task);
        this.globalProcessingState.increaseBucketsVersion();
    }

    public void prepareForShutdown() {
        this.shutdownStartTime = Instant.now(TwContextClockHolder.getClock());
        this.shuttingDown = true;
        this.tasksProcessingExecutor.shutdown();
        Iterator<String> it = this.bucketsManager.getBucketIds().iterator();
        while (it.hasNext()) {
            this.globalProcessingState.getBuckets().get(it.next()).increaseVersion();
        }
    }

    public boolean canShutdown() {
        if (this.tasksProperties.getInterruptTasksAfterShutdownTime() != null && Duration.between(this.shutdownStartTime, Instant.now(TwContextClockHolder.getClock())).compareTo(this.tasksProperties.getInterruptTasksAfterShutdownTime()) > 0) {
            LockUtils.withLock(this.tasksProcessingThreadsLock, () -> {
                log.info("taskProcessingThreads: " + this.tasksProcessingThreads.size());
                for (Thread thread : this.tasksProcessingThreads) {
                    log.info("Interrupting thread: " + thread);
                    thread.interrupt();
                }
            });
        }
        return this.tasksProcessingExecutor.isTerminated() && this.runningTasksCount.get() == 0;
    }

    @Override // com.transferwise.tasks.processing.ITasksProcessingService
    public void startProcessing() {
        this.lifecycleLock.lock();
        try {
            if (this.processingStarted) {
                return;
            }
            for (String str : this.bucketsManager.getBucketIds()) {
                if (!this.bucketsManager.getBucketProperties(str).getTriggerSameTaskInAllNodes().booleanValue() && this.tasksProperties.isCheckVersionBeforeGrabbing()) {
                    log.warn("Suboptimal configuration for bucket '" + str + "' found. triggerSameTaskInAllNodes=false and checkVersionBeforeGrabbing=true.");
                }
                GlobalProcessingState.Bucket bucket = this.globalProcessingState.getBuckets().get(str);
                this.coreMetricsTemplate.registerRunningTasksCount(str, () -> {
                    return Integer.valueOf(bucket.getRunningTasksCount().get());
                });
                this.coreMetricsTemplate.registerInProgressTasksGrabbingCount(str, () -> {
                    return Integer.valueOf(bucket.getInProgressTasksGrabbingCount().get());
                });
                this.coreMetricsTemplate.registerProcessingTriggersCount(str, () -> {
                    return Integer.valueOf(bucket.getSize().get());
                });
                this.coreMetricsTemplate.registerProcessingStateVersion(str, () -> {
                    return Long.valueOf(bucket.getVersion().get());
                });
                this.tasksProcessingExecutor.submit(() -> {
                    while (!this.shuttingDown) {
                        try {
                            long j = bucket.getVersion().get();
                            Instant tryAgainTime = processTasks(bucket).getTryAgainTime();
                            Lock versionLock = bucket.getVersionLock();
                            versionLock.lock();
                            try {
                                if (bucket.getVersion().get() == j && !this.shuttingDown) {
                                    long min = Math.min(this.tasksProperties.getGenericMediumDelay().toMillis(), tryAgainTime == null ? 2147483647L : tryAgainTime.toEpochMilli() - TwContextClockHolder.getClock().millis());
                                    if (min > 0) {
                                        try {
                                            bucket.getVersionCondition().await(min, TimeUnit.MILLISECONDS);
                                        } catch (InterruptedException e) {
                                            log.error(e.getMessage(), e);
                                        }
                                    }
                                }
                                versionLock.unlock();
                            } catch (Throwable th) {
                                versionLock.unlock();
                                throw th;
                                break;
                            }
                        } catch (Throwable th2) {
                            log.error(th2.getMessage(), th2);
                            WaitUtils.sleepQuietly(this.tasksProperties.getGenericMediumDelay());
                        }
                    }
                });
            }
            this.processingStarted = true;
            this.lifecycleLock.unlock();
        } finally {
            this.lifecycleLock.unlock();
        }
    }
}
