package com.transferwise.tasks.triggering;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.common.baseutils.clock.ClockHolder;
import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy;
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.buckets.BucketProperties;
import com.transferwise.tasks.buckets.IBucketsManager;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.domain.BaseTask;
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.handler.interfaces.ITaskHandler;
import com.transferwise.tasks.handler.interfaces.ITaskHandlerRegistry;
import com.transferwise.tasks.helpers.IErrorLoggingThrottler;
import com.transferwise.tasks.helpers.IMeterHelper;
import com.transferwise.tasks.helpers.executors.IExecutorsHelper;
import com.transferwise.tasks.helpers.kafka.ITopicPartitionsManager;
import com.transferwise.tasks.mdc.MdcContext;
import com.transferwise.tasks.processing.ITasksProcessingService;
import com.transferwise.tasks.processing.ProcessingState;
import com.transferwise.tasks.utils.JsonUtils;
import com.transferwise.tasks.utils.LogUtils;
import com.transferwise.tasks.utils.WaitUtils;
import java.time.ZonedDateTime;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidOffsetException;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Transactional(propagation = Propagation.NEVER, rollbackFor = {Exception.class})
/* loaded from: input_file:com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.class */
public class KafkaTasksExecutionTriggerer implements ITasksExecutionTriggerer, GracefulShutdownStrategy {
    private static final Logger log = LoggerFactory.getLogger(KafkaTasksExecutionTriggerer.class);

    @Autowired
    private KafkaProperties kafkaProperties;

    @Autowired
    private ITasksProcessingService tasksProcessingService;

    @Autowired
    private ITaskDao taskDao;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private TasksProperties tasksProperties;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    @Lazy
    private ITaskHandlerRegistry taskHandlerRegistry;

    @Autowired
    private IExecutorsHelper executorsHelper;

    @Autowired
    private ITopicPartitionsManager topicPartitionsManager;

    @Autowired
    private ProcessingState processingState;

    @Autowired
    private IBucketsManager bucketsManager;

    @Autowired
    private IErrorLoggingThrottler errorLoggingThrottler;

    @Autowired
    private IMeterHelper meterHelper;
    private ExecutorService executorService;
    private volatile boolean shuttingDown;
    private String triggerTopic;
    private Map<String, Object> kafkaConsumerProps;
    private Map<String, ConsumerBucket> consumerBuckets = new ConcurrentHashMap();
    private AtomicInteger pollingBucketsCount = new AtomicInteger();

    /* loaded from: input_file:com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer$ConsumerBucket.class */
    public static class ConsumerBucket {
        private String bucketId;
        private KafkaConsumer<String, String> kafkaConsumer;
        private int unprocessedFetchedRecordsCount;
        private boolean topicConfigured;
        private long lastCommitTime = System.currentTimeMillis();
        private Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions = new ConcurrentHashMap();
        private Lock offsetsStorageLock = new ReentrantLock();
        private Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommited = new ConcurrentHashMap();

        public int getOffsetsToBeCommitedCount() {
            return this.offsetsToBeCommited.size();
        }

        public int getUnprocessedFetchedRecordsCount() {
            return this.unprocessedFetchedRecordsCount;
        }

        public int getOffsetsCount() {
            return this.consumerTopicPartitions.values().stream().mapToInt(consumerTopicPartition -> {
                return consumerTopicPartition.getOffsets().size();
            }).sum();
        }

        public int getOffsetsCompletedCount() {
            return this.consumerTopicPartitions.values().stream().mapToInt(consumerTopicPartition -> {
                return consumerTopicPartition.getOffsetsCompleted().size();
            }).sum();
        }

        public void decrementUnprocessedFetchedRecordsCount() {
            this.unprocessedFetchedRecordsCount--;
        }

        public String getBucketId() {
            return this.bucketId;
        }

        public long getLastCommitTime() {
            return this.lastCommitTime;
        }

        public KafkaConsumer<String, String> getKafkaConsumer() {
            return this.kafkaConsumer;
        }

        public Map<TopicPartition, ConsumerTopicPartition> getConsumerTopicPartitions() {
            return this.consumerTopicPartitions;
        }

        public Lock getOffsetsStorageLock() {
            return this.offsetsStorageLock;
        }

        public Map<TopicPartition, OffsetAndMetadata> getOffsetsToBeCommited() {
            return this.offsetsToBeCommited;
        }

        public boolean isTopicConfigured() {
            return this.topicConfigured;
        }

        public ConsumerBucket setBucketId(String str) {
            this.bucketId = str;
            return this;
        }

        public ConsumerBucket setLastCommitTime(long j) {
            this.lastCommitTime = j;
            return this;
        }

        public ConsumerBucket setKafkaConsumer(KafkaConsumer<String, String> kafkaConsumer) {
            this.kafkaConsumer = kafkaConsumer;
            return this;
        }

        public ConsumerBucket setConsumerTopicPartitions(Map<TopicPartition, ConsumerTopicPartition> map) {
            this.consumerTopicPartitions = map;
            return this;
        }

        public ConsumerBucket setOffsetsStorageLock(Lock lock) {
            this.offsetsStorageLock = lock;
            return this;
        }

        public ConsumerBucket setOffsetsToBeCommited(Map<TopicPartition, OffsetAndMetadata> map) {
            this.offsetsToBeCommited = map;
            return this;
        }

        public ConsumerBucket setUnprocessedFetchedRecordsCount(int i) {
            this.unprocessedFetchedRecordsCount = i;
            return this;
        }

        public ConsumerBucket setTopicConfigured(boolean z) {
            this.topicConfigured = z;
            return this;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ConsumerBucket)) {
                return false;
            }
            ConsumerBucket consumerBucket = (ConsumerBucket) obj;
            if (!consumerBucket.canEqual(this)) {
                return false;
            }
            String bucketId = getBucketId();
            String bucketId2 = consumerBucket.getBucketId();
            if (bucketId == null) {
                if (bucketId2 != null) {
                    return false;
                }
            } else if (!bucketId.equals(bucketId2)) {
                return false;
            }
            if (getLastCommitTime() != consumerBucket.getLastCommitTime()) {
                return false;
            }
            KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
            KafkaConsumer<String, String> kafkaConsumer2 = consumerBucket.getKafkaConsumer();
            if (kafkaConsumer == null) {
                if (kafkaConsumer2 != null) {
                    return false;
                }
            } else if (!kafkaConsumer.equals(kafkaConsumer2)) {
                return false;
            }
            Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions = getConsumerTopicPartitions();
            Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions2 = consumerBucket.getConsumerTopicPartitions();
            if (consumerTopicPartitions == null) {
                if (consumerTopicPartitions2 != null) {
                    return false;
                }
            } else if (!consumerTopicPartitions.equals(consumerTopicPartitions2)) {
                return false;
            }
            Lock offsetsStorageLock = getOffsetsStorageLock();
            Lock offsetsStorageLock2 = consumerBucket.getOffsetsStorageLock();
            if (offsetsStorageLock == null) {
                if (offsetsStorageLock2 != null) {
                    return false;
                }
            } else if (!offsetsStorageLock.equals(offsetsStorageLock2)) {
                return false;
            }
            Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommited = getOffsetsToBeCommited();
            Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommited2 = consumerBucket.getOffsetsToBeCommited();
            if (offsetsToBeCommited == null) {
                if (offsetsToBeCommited2 != null) {
                    return false;
                }
            } else if (!offsetsToBeCommited.equals(offsetsToBeCommited2)) {
                return false;
            }
            return getUnprocessedFetchedRecordsCount() == consumerBucket.getUnprocessedFetchedRecordsCount() && isTopicConfigured() == consumerBucket.isTopicConfigured();
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ConsumerBucket;
        }

        public int hashCode() {
            String bucketId = getBucketId();
            int hashCode = (1 * 59) + (bucketId == null ? 43 : bucketId.hashCode());
            long lastCommitTime = getLastCommitTime();
            int i = (hashCode * 59) + ((int) ((lastCommitTime >>> 32) ^ lastCommitTime));
            KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
            int hashCode2 = (i * 59) + (kafkaConsumer == null ? 43 : kafkaConsumer.hashCode());
            Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions = getConsumerTopicPartitions();
            int hashCode3 = (hashCode2 * 59) + (consumerTopicPartitions == null ? 43 : consumerTopicPartitions.hashCode());
            Lock offsetsStorageLock = getOffsetsStorageLock();
            int hashCode4 = (hashCode3 * 59) + (offsetsStorageLock == null ? 43 : offsetsStorageLock.hashCode());
            Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommited = getOffsetsToBeCommited();
            return (((((hashCode4 * 59) + (offsetsToBeCommited == null ? 43 : offsetsToBeCommited.hashCode())) * 59) + getUnprocessedFetchedRecordsCount()) * 59) + (isTopicConfigured() ? 79 : 97);
        }

        public String toString() {
            return "KafkaTasksExecutionTriggerer.ConsumerBucket(bucketId=" + getBucketId() + ", lastCommitTime=" + getLastCommitTime() + ", kafkaConsumer=" + getKafkaConsumer() + ", consumerTopicPartitions=" + getConsumerTopicPartitions() + ", offsetsStorageLock=" + getOffsetsStorageLock() + ", offsetsToBeCommited=" + getOffsetsToBeCommited() + ", unprocessedFetchedRecordsCount=" + getUnprocessedFetchedRecordsCount() + ", topicConfigured=" + isTopicConfigured() + ")";
        }
    }

    /* loaded from: input_file:com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer$ConsumerTopicPartition.class */
    public static class ConsumerTopicPartition {
        private TreeSet<Long> offsets = new TreeSet<>();
        private Map<Long, Boolean> offsetsCompleted = new HashMap();

        public boolean isDone(Long l) {
            Boolean bool = this.offsetsCompleted.get(l);
            return bool != null && bool.booleanValue();
        }

        public TreeSet<Long> getOffsets() {
            return this.offsets;
        }

        public Map<Long, Boolean> getOffsetsCompleted() {
            return this.offsetsCompleted;
        }

        public ConsumerTopicPartition setOffsets(TreeSet<Long> treeSet) {
            this.offsets = treeSet;
            return this;
        }

        public ConsumerTopicPartition setOffsetsCompleted(Map<Long, Boolean> map) {
            this.offsetsCompleted = map;
            return this;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ConsumerTopicPartition)) {
                return false;
            }
            ConsumerTopicPartition consumerTopicPartition = (ConsumerTopicPartition) obj;
            if (!consumerTopicPartition.canEqual(this)) {
                return false;
            }
            TreeSet<Long> offsets = getOffsets();
            TreeSet<Long> offsets2 = consumerTopicPartition.getOffsets();
            if (offsets == null) {
                if (offsets2 != null) {
                    return false;
                }
            } else if (!offsets.equals(offsets2)) {
                return false;
            }
            Map<Long, Boolean> offsetsCompleted = getOffsetsCompleted();
            Map<Long, Boolean> offsetsCompleted2 = consumerTopicPartition.getOffsetsCompleted();
            return offsetsCompleted == null ? offsetsCompleted2 == null : offsetsCompleted.equals(offsetsCompleted2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ConsumerTopicPartition;
        }

        public int hashCode() {
            TreeSet<Long> offsets = getOffsets();
            int hashCode = (1 * 59) + (offsets == null ? 43 : offsets.hashCode());
            Map<Long, Boolean> offsetsCompleted = getOffsetsCompleted();
            return (hashCode * 59) + (offsetsCompleted == null ? 43 : offsetsCompleted.hashCode());
        }

        public String toString() {
            return "KafkaTasksExecutionTriggerer.ConsumerTopicPartition(offsets=" + getOffsets() + ", offsetsCompleted=" + getOffsetsCompleted() + ")";
        }
    }

    @PostConstruct
    public void init() {
        this.executorService = this.executorsHelper.newCachedExecutor("ktet");
        this.triggerTopic = IMeterHelper.METRIC_PREFIX + this.tasksProperties.getGroupId() + ".executeTask";
        this.kafkaConsumerProps = this.kafkaProperties.buildConsumerProperties();
        this.tasksProcessingService.addTaskTriggeringFinishedListener(taskTriggering -> {
            if (taskTriggering.isSameProcessTrigger()) {
                return;
            }
            releaseCompletedOffset(this.consumerBuckets.get(taskTriggering.getBucketId()), taskTriggering.getTopicPartition(), taskTriggering.getOffset());
        });
        this.meterHelper.registerGauge("twTasks.kafkaTasksExecutionTriggerer.pollingBucketsCount", () -> {
            return Integer.valueOf(this.pollingBucketsCount.get());
        });
    }

    @Override // com.transferwise.tasks.triggering.ITasksExecutionTriggerer
    public void trigger(BaseTask baseTask) {
        ITaskHandler taskHandler = this.taskHandlerRegistry.getTaskHandler(baseTask);
        if (taskHandler == null) {
            log.error("Marking task {} as ERROR, because no task handler was found for type '" + baseTask.getType() + "'.");
            this.meterHelper.registerTaskMarkedAsError(null, baseTask.getType());
            if (this.taskDao.setStatus(baseTask.getId(), TaskStatus.ERROR, baseTask.getVersion())) {
                return;
            }
            log.error("Marking task {} as ERROR failed, version may have changed.", LogUtils.asParameter(baseTask.getVersionId()));
            return;
        }
        String processingBucket = taskHandler.getProcessingPolicy(baseTask).getProcessingBucket(baseTask);
        if (!this.bucketsManager.isConfiguredBucket(processingBucket)) {
            log.error("Marking task {} as ERROR, because task handler has unknown bucket '{}'.", LogUtils.asParameter(baseTask.getVersionId()), processingBucket);
            this.meterHelper.registerTaskMarkedAsError(processingBucket, baseTask.getType());
            if (this.taskDao.setStatus(baseTask.getId(), TaskStatus.ERROR, baseTask.getVersion())) {
                return;
            }
            log.error("Marking task {} as ERROR failed, version may have changed.", LogUtils.asParameter(baseTask.getVersionId()));
            return;
        }
        if (BooleanUtils.isTrue(this.bucketsManager.getBucketProperties(processingBucket).getTriggerInSameProcess())) {
            if (this.tasksProcessingService.addTaskForProcessing(new TaskTriggering().setTask(baseTask).setBucketId(processingBucket)).getResult() == ITasksProcessingService.AddTaskForProcessingResponse.ResultCode.OK) {
                return;
            }
        }
        this.kafkaTemplate.send(getTopic(processingBucket), UUID.randomUUID().toString(), JsonUtils.toJson(this.objectMapper, baseTask)).addCallback(sendResult -> {
            if (log.isDebugEnabled()) {
                MdcContext.with(() -> {
                    MdcContext.put(this.tasksProperties.getTwTaskVersionIdMdcKey(), baseTask.getVersionId());
                    log.debug("Task '{}' triggering acknowledged by Kafka.", baseTask.getVersionId());
                });
            }
        }, th -> {
            if (log.isDebugEnabled() || this.errorLoggingThrottler.canLogError()) {
                MdcContext.with(() -> {
                    MdcContext.put(this.tasksProperties.getTwTaskVersionIdMdcKey(), baseTask.getVersionId());
                    log.error("Task {} triggering failed through Kafka.", LogUtils.asParameter(baseTask.getVersionId()), th);
                });
            }
        });
    }

    protected ConsumerBucket getConsumerBucket(String str) {
        return (ConsumerBucket) ExceptionUtils.doUnchecked(() -> {
            ConsumerBucket consumerBucket = this.consumerBuckets.get(str);
            if (consumerBucket == null) {
                Map<String, ConsumerBucket> map = this.consumerBuckets;
                ConsumerBucket bucketId = new ConsumerBucket().setBucketId(str);
                consumerBucket = bucketId;
                map.put(str, bucketId);
                Map<String, String> of = ImmutableMap.of("bucketId", str);
                IMeterHelper iMeterHelper = this.meterHelper;
                consumerBucket.getClass();
                iMeterHelper.registerGauge("twTasks.kafkaTasksExecutionTriggerer.offsetsToBeCommitedCount", of, consumerBucket::getOffsetsToBeCommitedCount);
                IMeterHelper iMeterHelper2 = this.meterHelper;
                consumerBucket.getClass();
                iMeterHelper2.registerGauge("twTasks.kafkaTasksExecutionTriggerer.offsetsCompletedCount", of, consumerBucket::getOffsetsCompletedCount);
                IMeterHelper iMeterHelper3 = this.meterHelper;
                consumerBucket.getClass();
                iMeterHelper3.registerGauge("twTasks.kafkaTasksExecutionTriggerer.unprocessedFetchedRecordsCount", of, consumerBucket::getUnprocessedFetchedRecordsCount);
                IMeterHelper iMeterHelper4 = this.meterHelper;
                consumerBucket.getClass();
                iMeterHelper4.registerGauge("twTasks.kafkaTasksExecutionTriggerer.offsetsCount", of, consumerBucket::getOffsetsCount);
            }
            BucketProperties bucketProperties = this.bucketsManager.getBucketProperties(str);
            if (!consumerBucket.isTopicConfigured()) {
                this.topicPartitionsManager.setPartitionsCount(getTopic(str), bucketProperties.getTriggeringTopicPartitionsCount().intValue());
                consumerBucket.setTopicConfigured(true);
            }
            if (consumerBucket.getKafkaConsumer() == null) {
                String str2 = (String) this.kafkaConsumerProps.get("group.id");
                if (str2 == null) {
                    throw new IllegalStateException("Kafka consumer group id is not set.");
                }
                if (bucketProperties.getTriggerSameTaskInAllNodes().booleanValue()) {
                    log.info("Using same task triggering on all nodes strategy for bucket '" + str + "'.");
                    str2 = str2 + "." + this.tasksProperties.getClientId();
                }
                HashMap hashMap = new HashMap(this.kafkaConsumerProps);
                hashMap.put("max.poll.records", bucketProperties.getTriggersFetchSize());
                hashMap.put("group.id", str2);
                hashMap.put("enable.auto.commit", false);
                hashMap.put("client.id", hashMap.getOrDefault("client.id", "") + ".tw-tasks.bucket." + str);
                hashMap.put("auto.offset.reset", this.tasksProperties.useSmartAutoOffsetReset() ? null : this.tasksProperties.getAutoResetOffsetTo());
                final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(hashMap);
                consumerBucket.setKafkaConsumer(kafkaConsumer);
                List<String> topics = getTopics(str);
                log.info("Subscribing to Kafka topics '" + topics + "'");
                kafkaConsumer.subscribe(topics, this.tasksProperties.useSmartAutoOffsetReset() ? new ConsumerRebalanceListener() { // from class: com.transferwise.tasks.triggering.KafkaTasksExecutionTriggerer.1
                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    }

                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        HashMap hashMap2 = new HashMap();
                        Long valueOf = Long.valueOf(ZonedDateTime.now(ClockHolder.getClock()).minus((TemporalAmount) KafkaTasksExecutionTriggerer.this.tasksProperties.getTaskStuckTimeout()).toEpochSecond());
                        for (TopicPartition topicPartition : collection) {
                            try {
                                kafkaConsumer.position(topicPartition);
                            } catch (InvalidOffsetException e) {
                                hashMap2.put(topicPartition, valueOf);
                            }
                        }
                        ArrayList arrayList = new ArrayList();
                        if (!hashMap2.isEmpty()) {
                            Map offsetsForTimes = kafkaConsumer.offsetsForTimes(hashMap2);
                            KafkaConsumer kafkaConsumer2 = kafkaConsumer;
                            offsetsForTimes.forEach((topicPartition2, offsetAndTimestamp) -> {
                                if (offsetAndTimestamp != null) {
                                    kafkaConsumer2.seek(topicPartition2, offsetAndTimestamp.offset());
                                } else {
                                    arrayList.add(topicPartition2);
                                }
                            });
                        }
                        if (arrayList.isEmpty()) {
                            return;
                        }
                        kafkaConsumer.seekToBeginning(arrayList);
                    }
                } : new NoOpConsumerRebalanceListener());
            }
            return consumerBucket;
        });
    }

    public void poll(String str) {
        log.info("Started to listen tasks triggers in bucket '" + str + "'.");
        try {
            this.pollingBucketsCount.incrementAndGet();
            ConsumerBucket consumerBucket = getConsumerBucket(str);
            ProcessingState.Bucket bucket = this.processingState.getBuckets().get(str);
            while (!this.shuttingDown) {
                ConsumerRecords poll = consumerBucket.getKafkaConsumer().poll(this.tasksProperties.getGenericMediumDelay());
                commitOffsets(consumerBucket, false);
                consumerBucket.setUnprocessedFetchedRecordsCount(poll.count());
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                    long offset = consumerRecord.offset();
                    registerPolledOffset(consumerBucket, topicPartition, offset);
                    log.debug("Received Kafka message from topic '{}' partition {} offset {}.", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(offset)});
                    BaseTask baseTask = (BaseTask) JsonUtils.fromJson(this.objectMapper, (String) consumerRecord.value(), BaseTask.class);
                    MdcContext.with(() -> {
                        MdcContext.put(this.tasksProperties.getTwTaskVersionIdMdcKey(), baseTask.getVersionId());
                        TaskTriggering topicPartition2 = new TaskTriggering().setTask(baseTask).setBucketId(str).setOffset(offset).setTopicPartition(topicPartition);
                        this.meterHelper.incrementCounter("twTasks.kafkaTasksExecutionTriggerer.receivedTriggersCount", ImmutableMap.of("bucketId", str), 1L);
                        while (!this.shuttingDown) {
                            long j = bucket.getVersion().get();
                            log.debug("Adding task '{}' for processing.", baseTask.getVersionId());
                            if (this.tasksProcessingService.addTaskForProcessing(topicPartition2).getResult() != ITasksProcessingService.AddTaskForProcessingResponse.ResultCode.FULL) {
                                break;
                            }
                            bucket.getVersionLock().lock();
                            while (bucket.getVersion().get() == j && !this.shuttingDown) {
                                try {
                                    try {
                                        bucket.getVersionCondition().await(this.tasksProperties.getGenericMediumDelay().toMillis(), TimeUnit.MILLISECONDS);
                                    } catch (InterruptedException e) {
                                        log.error(e.getMessage(), e);
                                    }
                                } finally {
                                    bucket.getVersionLock().unlock();
                                }
                            }
                        }
                        consumerBucket.decrementUnprocessedFetchedRecordsCount();
                    });
                }
            }
        } finally {
            this.pollingBucketsCount.decrementAndGet();
            closeKafkaConsumer(this.consumerBuckets.get(str));
        }
    }

    private void registerPolledOffset(ConsumerBucket consumerBucket, TopicPartition topicPartition, long j) {
        ConsumerTopicPartition consumerTopicPartition = consumerBucket.getConsumerTopicPartitions().get(topicPartition);
        if (consumerTopicPartition == null) {
            Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions = consumerBucket.getConsumerTopicPartitions();
            ConsumerTopicPartition consumerTopicPartition2 = new ConsumerTopicPartition();
            consumerTopicPartition = consumerTopicPartition2;
            consumerTopicPartitions.put(topicPartition, consumerTopicPartition2);
        }
        consumerBucket.getOffsetsStorageLock().lock();
        try {
            consumerTopicPartition.getOffsetsCompleted().remove(Long.valueOf(j));
            consumerTopicPartition.getOffsets().add(Long.valueOf(j));
            consumerBucket.getOffsetsStorageLock().unlock();
        } catch (Throwable th) {
            consumerBucket.getOffsetsStorageLock().unlock();
            throw th;
        }
    }

    private void releaseCompletedOffset(ConsumerBucket consumerBucket, TopicPartition topicPartition, long j) {
        consumerBucket.getOffsetsStorageLock().lock();
        try {
            ConsumerTopicPartition consumerTopicPartition = consumerBucket.getConsumerTopicPartitions().get(topicPartition);
            TreeSet<Long> offsets = consumerTopicPartition.getOffsets();
            if (!offsets.contains(Long.valueOf(j))) {
                log.warn("Offset " + j + " has already been commited.");
                consumerBucket.getOffsetsStorageLock().unlock();
                return;
            }
            consumerTopicPartition.getOffsetsCompleted().put(Long.valueOf(j), Boolean.TRUE);
            if (offsets.first().longValue() == j) {
                while (!offsets.isEmpty()) {
                    long longValue = offsets.first().longValue();
                    if (!consumerTopicPartition.isDone(Long.valueOf(longValue))) {
                        break;
                    }
                    consumerBucket.getOffsetsToBeCommited().put(topicPartition, new OffsetAndMetadata(longValue + 1));
                    offsets.pollFirst();
                    consumerTopicPartition.getOffsetsCompleted().remove(Long.valueOf(longValue));
                }
            }
        } finally {
            consumerBucket.getOffsetsStorageLock().unlock();
        }
    }

    private void commitOffsets(ConsumerBucket consumerBucket, boolean z) {
        if (z || System.currentTimeMillis() - consumerBucket.getLastCommitTime() >= this.tasksProperties.getGenericMediumDelay().toMillis()) {
            String bucketId = consumerBucket.getBucketId();
            consumerBucket.getOffsetsStorageLock().lock();
            try {
                if (consumerBucket.getOffsetsToBeCommited().isEmpty()) {
                    return;
                }
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Commiting bucket '" + bucketId + "' offsets to Kafka: " + ((String) consumerBucket.getOffsetsToBeCommited().entrySet().stream().map(entry -> {
                            return entry.getKey() + ":" + ((OffsetAndMetadata) entry.getValue()).offset();
                        }).collect(Collectors.joining(", "))));
                    }
                    this.meterHelper.incrementCounter("twTasks.kafkaTasksExecutionTriggerer.commitsCount", ImmutableMap.of("bucketId", bucketId), 1L);
                    if (z) {
                        consumerBucket.getKafkaConsumer().commitSync(consumerBucket.getOffsetsToBeCommited());
                    } else {
                        consumerBucket.getKafkaConsumer().commitAsync(consumerBucket.getOffsetsToBeCommited(), (map, exc) -> {
                            if (exc != null) {
                                registerCommitException(bucketId, exc);
                            }
                        });
                    }
                } catch (Throwable th) {
                    registerCommitException(bucketId, th);
                }
                consumerBucket.getOffsetsToBeCommited().clear();
                consumerBucket.getOffsetsStorageLock().unlock();
                consumerBucket.setLastCommitTime(System.currentTimeMillis());
            } finally {
                consumerBucket.getOffsetsStorageLock().unlock();
            }
        }
    }

    protected void registerCommitException(String str, Throwable th) {
        if ((th instanceof CommitFailedException) || (th instanceof RetriableException)) {
            this.meterHelper.incrementCounter("twTasks.kafkaTasksExecutionTriggerer.failedCommitsCount", ImmutableMap.of("bucketId", str), 1L);
            log.debug("Committing Kafka offset failed for bucket '" + str + "'.", th);
        } else if (this.errorLoggingThrottler.canLogError()) {
            log.error("Committing Kafka offset failed for bucket '" + str + "'.", th);
        }
    }

    private void closeKafkaConsumer(ConsumerBucket consumerBucket) {
        KafkaConsumer<String, String> kafkaConsumer;
        if (consumerBucket == null || (kafkaConsumer = consumerBucket.getKafkaConsumer()) == null) {
            return;
        }
        commitOffsets(consumerBucket, true);
        try {
            kafkaConsumer.unsubscribe();
        } catch (Throwable th) {
            log.error(th.getMessage(), th);
        }
        try {
            kafkaConsumer.close();
        } catch (Throwable th2) {
            log.error(th2.getMessage(), th2);
        }
        log.info("Closed Kafka consumer for bucket '" + consumerBucket.getBucketId() + "'.");
        consumerBucket.setKafkaConsumer(null);
    }

    private String getTopic(String str) {
        String str2 = this.triggerTopic;
        if (StringUtils.isNotEmpty(str)) {
            str2 = str2 + "." + str;
        }
        if (StringUtils.isNotEmpty(this.tasksProperties.getKafkaTopicsNamespace())) {
            str2 = this.tasksProperties.getKafkaTopicsNamespace() + "." + str2;
        }
        return str2;
    }

    private List<String> getTopics(String str) {
        ArrayList arrayList = new ArrayList();
        String topic = getTopic(str);
        arrayList.add(topic);
        for (String str2 : StringUtils.split(this.tasksProperties.getKafkaDataCenterPrefixes(), ",")) {
            arrayList.add(str2 + topic);
        }
        return arrayList;
    }

    public void applicationStarted() {
        for (String str : this.bucketsManager.getBucketIds()) {
            this.executorService.submit(() -> {
                while (!this.shuttingDown) {
                    try {
                        poll(str);
                    } catch (Throwable th) {
                        log.error(th.getMessage(), th);
                        try {
                            closeKafkaConsumer(this.consumerBuckets.get(str));
                        } catch (Throwable th2) {
                            log.error(th2.getMessage(), th2);
                        }
                        WaitUtils.sleepQuietly(this.tasksProperties.getGenericMediumDelay());
                    }
                }
            });
        }
    }

    public void prepareForShutdown() {
        this.shuttingDown = true;
        this.executorService.shutdown();
    }

    public boolean canShutdown() {
        return this.executorService.isTerminated();
    }
}
