package com.transferwise.tasks.stucktasks;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor;
import com.transferwise.common.baseutils.concurrency.ThreadNamingExecutorServiceWrapper;
import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy;
import com.transferwise.common.leaderselector.Leader;
import com.transferwise.common.leaderselector.LeaderSelector;
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.config.IExecutorServicesProvider;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.domain.BaseTask;
import com.transferwise.tasks.domain.IBaseTask;
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.handler.interfaces.ITaskHandler;
import com.transferwise.tasks.handler.interfaces.ITaskHandlerRegistry;
import com.transferwise.tasks.handler.interfaces.ITaskProcessingPolicy;
import com.transferwise.tasks.helpers.IMeterHelper;
import com.transferwise.tasks.mdc.MdcContext;
import com.transferwise.tasks.triggering.ITasksExecutionTriggerer;
import com.transferwise.tasks.utils.DomainUtils;
import com.transferwise.tasks.utils.LogUtils;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.TemporalAmount;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/transferwise/tasks/stucktasks/TasksResumer.class */
public class TasksResumer implements ITasksResumer, GracefulShutdownStrategy {
    private static final Logger log = LoggerFactory.getLogger(TasksResumer.class);
    private final ITasksExecutionTriggerer tasksExecutionTriggerer;
    private final ITaskHandlerRegistry taskHandlerRegistry;
    private final TasksProperties tasksProperties;
    private final ITaskDao taskDao;
    private final CuratorFramework curatorFramework;
    private final IExecutorServicesProvider executorServicesProvider;
    private final IMeterHelper meterHelper;
    private LeaderSelector leaderSelector;
    private volatile boolean paused;
    private volatile boolean shuttingDown = false;
    private int batchSize = 1000;

    @PostConstruct
    public void init() {
        String str = "/tw/tw_tasks/" + this.tasksProperties.getGroupId() + "/tasks_resumer";
        ThreadNamingExecutorServiceWrapper threadNamingExecutorServiceWrapper = new ThreadNamingExecutorServiceWrapper("tw-tasks-resumer", this.executorServicesProvider.getGlobalExecutorService());
        verifyCorrectCuratorConfig();
        this.leaderSelector = new LeaderSelector(this.curatorFramework, str, threadNamingExecutorServiceWrapper, control -> {
            ScheduledTaskExecutor globalScheduledTaskExecutor = this.executorServicesProvider.getGlobalScheduledTaskExecutor();
            MutableObject mutableObject = new MutableObject();
            MutableObject mutableObject2 = new MutableObject();
            control.workAsyncUntilShouldStop(() -> {
                mutableObject.setValue(globalScheduledTaskExecutor.scheduleAtFixedInterval(() -> {
                    resumeStuckTasks(control);
                }, this.tasksProperties.getStuckTasksPollingInterval(), this.tasksProperties.getStuckTasksPollingInterval()));
                log.info("Started to resume stuck tasks after each " + this.tasksProperties.getStuckTasksPollingInterval() + ".");
                mutableObject2.setValue(globalScheduledTaskExecutor.scheduleAtFixedInterval(() -> {
                    if (this.paused) {
                        return;
                    }
                    resumeWaitingTasks(control);
                }, this.tasksProperties.getWaitingTasksPollingInterval(), this.tasksProperties.getWaitingTasksPollingInterval()));
                log.info("Started to resume scheduled tasks after each " + this.tasksProperties.getWaitingTasksPollingInterval() + ".");
            }, () -> {
                log.info("Stopping stuck tasks resumer.");
                if (mutableObject.getValue() != null) {
                    ((ScheduledTaskExecutor.TaskHandle) mutableObject.getValue()).stop();
                }
                log.info("Stopping scheduled tasks executor.");
                if (mutableObject2.getValue() != null) {
                    ((ScheduledTaskExecutor.TaskHandle) mutableObject2.getValue()).stop();
                }
                if (mutableObject.getValue() != null) {
                    ((ScheduledTaskExecutor.TaskHandle) mutableObject.getValue()).waitUntilStopped(Duration.ofMinutes(1L));
                }
                log.info("Stuck tasks resumer stopped.");
                if (mutableObject2.getValue() != null) {
                    ((ScheduledTaskExecutor.TaskHandle) mutableObject2.getValue()).waitUntilStopped(Duration.ofMinutes(1L));
                }
                log.info("Scheduled tasks executor stopped.");
            });
        });
    }

    protected void verifyCorrectCuratorConfig() {
        if (this.tasksProperties.isPreventStartWithoutZookeeper()) {
            ExceptionUtils.doUnchecked(() -> {
                while (!this.curatorFramework.blockUntilConnected(5, TimeUnit.SECONDS)) {
                    log.error("Connection to Zookeeper at '{}' failed.", this.curatorFramework.getZookeeperClient().getCurrentConnectionString());
                }
            });
        }
    }

    @Trace(dispatcher = true)
    protected void resumeStuckTasks(Leader.Control control) {
        ITaskDao.GetStuckTasksResponse stuckTasks;
        NewRelic.setTransactionName("TwTasksEngine", "ResumeStuckTasks");
        do {
            try {
                stuckTasks = this.taskDao.getStuckTasks(this.batchSize, TaskStatus.NEW, TaskStatus.SUBMITTED, TaskStatus.PROCESSING);
                AtomicInteger atomicInteger = new AtomicInteger();
                AtomicInteger atomicInteger2 = new AtomicInteger();
                AtomicInteger atomicInteger3 = new AtomicInteger();
                for (ITaskDao.StuckTask stuckTask : stuckTasks.getStuckTasks()) {
                    if (control.shouldStop()) {
                        return;
                    } else {
                        MdcContext.with(() -> {
                            MdcContext.put(this.tasksProperties.getTwTaskVersionIdMdcKey(), stuckTask.getVersionId());
                            handleStuckTask(stuckTask, atomicInteger, atomicInteger2, atomicInteger3);
                        });
                    }
                }
                log.debug("Resumed " + atomicInteger + ", marked as error/failed " + atomicInteger2 + " / " + atomicInteger3 + " stuck tasks.");
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
                return;
            }
        } while (stuckTasks.isHasMore());
    }

    @Trace(dispatcher = true)
    protected void resumeWaitingTasks(Leader.Control control) {
        ITaskDao.GetStuckTasksResponse stuckTasks;
        NewRelic.setTransactionName("TwTasksEngine", "WaitingTasksResumer");
        do {
            try {
                stuckTasks = this.taskDao.getStuckTasks(this.batchSize, TaskStatus.WAITING);
                for (ITaskDao.StuckTask stuckTask : stuckTasks.getStuckTasks()) {
                    if (control.shouldStop()) {
                        return;
                    } else {
                        MdcContext.with(() -> {
                            MdcContext.put(this.tasksProperties.getTwTaskVersionIdMdcKey(), stuckTask.getVersionId());
                            this.taskDao.setStatus(stuckTask.getVersionId().getId(), TaskStatus.SUBMITTED, stuckTask.getVersionId().getVersion());
                            BaseTask baseTask = (BaseTask) DomainUtils.convert(stuckTask, BaseTask.class);
                            baseTask.setVersion(baseTask.getVersion() + 1);
                            this.tasksExecutionTriggerer.trigger(baseTask);
                            this.meterHelper.incrementCounter("twTasks.tasksResumer.scheduledTasks.resumedCount", 1L);
                        });
                    }
                }
                if (log.isDebugEnabled() && stuckTasks.getStuckTasks().size() > 0) {
                    log.debug("Resumed " + stuckTasks.getStuckTasks().size() + " waiting tasks.");
                }
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
                return;
            }
        } while (stuckTasks.isHasMore());
    }

    protected void handleStuckTask(ITaskDao.StuckTask stuckTask, AtomicInteger atomicInteger, AtomicInteger atomicInteger2, AtomicInteger atomicInteger3) {
        ITaskProcessingPolicy.StuckTaskResolutionStrategy stuckTaskResolutionStrategy = null;
        if (!TaskStatus.PROCESSING.name().equals(stuckTask.getStatus())) {
            retryTask(stuckTask, atomicInteger);
            return;
        }
        ITaskHandler taskHandler = this.taskHandlerRegistry.getTaskHandler(stuckTask);
        if (taskHandler == null) {
            log.error("No task handler found for task " + LogUtils.asParameter(stuckTask.getVersionId()) + ".");
        } else {
            ITaskProcessingPolicy processingPolicy = taskHandler.getProcessingPolicy(stuckTask);
            if (processingPolicy == null) {
                log.error("No processing policy found for task " + LogUtils.asParameter(stuckTask.getVersionId()) + ".");
            } else {
                stuckTaskResolutionStrategy = processingPolicy.getStuckTaskResolutionStrategy(stuckTask);
            }
        }
        if (stuckTaskResolutionStrategy == null) {
            log.error("No processing policy found for task " + LogUtils.asParameter(stuckTask.getVersionId()) + ".");
            markTaskAsError(stuckTask, atomicInteger2);
            return;
        }
        ITaskProcessingPolicy.StuckTaskResolutionStrategy stuckTaskResolutionStrategy2 = this.taskHandlerRegistry.getTaskHandler(stuckTask).getProcessingPolicy(stuckTask).getStuckTaskResolutionStrategy(stuckTask);
        switch (stuckTaskResolutionStrategy2) {
            case RETRY:
                retryTask(stuckTask, atomicInteger);
                return;
            case MARK_AS_ERROR:
                markTaskAsError(stuckTask, atomicInteger2);
                return;
            case MARK_AS_FAILED:
                this.taskDao.setStatus(stuckTask.getVersionId().getId(), TaskStatus.FAILED, stuckTask.getVersionId().getVersion());
                atomicInteger3.getAndIncrement();
                this.meterHelper.incrementCounter("twTasks.tasksResumer.stuckTasks.markFailedCount", 1L);
                this.meterHelper.registerTaskMarkedAsFailed(null, stuckTask.getType());
                return;
            case IGNORE:
                this.meterHelper.incrementCounter("twTasks.tasksResumer.stuckTasks.ignoredCount", 1L);
                return;
            default:
                throw new UnsupportedOperationException("Resolution strategy " + stuckTaskResolutionStrategy2 + " is not supported.");
        }
    }

    protected void retryTask(ITaskDao.StuckTask stuckTask, AtomicInteger atomicInteger) {
        this.taskDao.markAsSubmittedAndSetNextEventTime(stuckTask.getVersionId(), getMaxStuckTime());
        BaseTask baseTask = (BaseTask) DomainUtils.convert(stuckTask, BaseTask.class);
        baseTask.setVersion(baseTask.getVersion() + 1);
        this.tasksExecutionTriggerer.trigger(baseTask);
        atomicInteger.getAndIncrement();
        this.meterHelper.incrementCounter("twTasks.tasksResumer.stuckTasks.resumedCount", 1L);
        this.meterHelper.registerTaskResuming(null, stuckTask.getType());
    }

    protected void markTaskAsError(IBaseTask iBaseTask, AtomicInteger atomicInteger) {
        this.taskDao.setStatus(iBaseTask.getVersionId().getId(), TaskStatus.ERROR, iBaseTask.getVersionId().getVersion());
        atomicInteger.getAndIncrement();
        this.meterHelper.incrementCounter("twTasks.tasksResumer.stuckTasks.markErrorCount", 1L);
        this.meterHelper.registerTaskMarkedAsError(null, iBaseTask.getType());
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
    }

    private ZonedDateTime getMaxStuckTime() {
        return ZonedDateTime.now(ClockHolder.getClock()).plus((TemporalAmount) this.tasksProperties.getTaskStuckTimeout());
    }

    public void applicationStarted() {
        this.executorServicesProvider.getGlobalExecutorService().submit(this::resumeTasksForClient);
        this.leaderSelector.start();
    }

    @Trace(dispatcher = true)
    protected void resumeTasksForClient() {
        NewRelic.setTransactionName("TwTasksEngine", "TasksForClientResuming");
        try {
            log.info("Checking if we can immediately resume this client's stuck tasks.");
            for (ITaskDao.StuckTask stuckTask : this.taskDao.prepareStuckOnProcessingTasksForResuming(this.tasksProperties.getClientId(), getMaxStuckTime())) {
                if (this.shuttingDown) {
                    break;
                } else {
                    MdcContext.with(() -> {
                        MdcContext.put(this.tasksProperties.getTwTaskVersionIdMdcKey(), stuckTask.getVersionId());
                        log.info("Resuming client '" + this.tasksProperties.getClientId() + "' task " + LogUtils.asParameter(stuckTask.getVersionId()) + " of type '" + stuckTask.getType() + "' in status '" + stuckTask.getStatus() + "'.");
                        this.tasksExecutionTriggerer.trigger((BaseTask) DomainUtils.convert(stuckTask, BaseTask.class));
                        this.meterHelper.incrementCounter("twTasks.tasksResumer.clientStuckTasks.resumedCount", 1L);
                        this.meterHelper.registerTaskResuming(null, stuckTask.getType());
                    });
                }
            }
        } catch (Throwable th) {
            log.error(th.getMessage(), th);
        }
    }

    public void prepareForShutdown() {
        this.shuttingDown = true;
        this.leaderSelector.stop();
    }

    public boolean canShutdown() {
        return this.leaderSelector.hasStopped();
    }

    public TasksResumer(ITasksExecutionTriggerer iTasksExecutionTriggerer, ITaskHandlerRegistry iTaskHandlerRegistry, TasksProperties tasksProperties, ITaskDao iTaskDao, CuratorFramework curatorFramework, IExecutorServicesProvider iExecutorServicesProvider, IMeterHelper iMeterHelper) {
        this.tasksExecutionTriggerer = iTasksExecutionTriggerer;
        this.taskHandlerRegistry = iTaskHandlerRegistry;
        this.tasksProperties = tasksProperties;
        this.taskDao = iTaskDao;
        this.curatorFramework = curatorFramework;
        this.executorServicesProvider = iExecutorServicesProvider;
        this.meterHelper = iMeterHelper;
    }
}
