package com.transferwise.tasks.triggering;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.common.baseutils.concurrency.LockUtils;
import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy;
import com.transferwise.tasks.ITasksService;
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.buckets.BucketProperties;
import com.transferwise.tasks.buckets.IBucketsManager;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.domain.BaseTask;
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.entrypoints.IMdcService;
import com.transferwise.tasks.handler.interfaces.ITaskHandler;
import com.transferwise.tasks.handler.interfaces.ITaskHandlerRegistry;
import com.transferwise.tasks.handler.interfaces.ITaskProcessingPolicy;
import com.transferwise.tasks.helpers.ICoreMetricsTemplate;
import com.transferwise.tasks.helpers.IErrorLoggingThrottler;
import com.transferwise.tasks.helpers.executors.IExecutorsHelper;
import com.transferwise.tasks.helpers.kafka.ITopicPartitionsManager;
import com.transferwise.tasks.helpers.kafka.partitionkey.IPartitionKeyStrategy;
import com.transferwise.tasks.processing.GlobalProcessingState;
import com.transferwise.tasks.processing.ITasksProcessingService;
import com.transferwise.tasks.utils.JsonUtils;
import com.transferwise.tasks.utils.LogUtils;
import com.transferwise.tasks.utils.WaitUtils;
import com.vdurmont.semver4j.Semver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ReassignmentInProgressException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionSynchronizationManager;

/* loaded from: input_file:com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.class */
public class KafkaTasksExecutionTriggerer implements ITasksExecutionTriggerer, GracefulShutdownStrategy, InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(KafkaTasksExecutionTriggerer.class);
    private static final int MAX_KAFKA_PRODUCER_INSTANTIATION_ATTEMPTS = 5;
    private static final int KAFKA_PRODUCER_INSTANTIATION_FAILURE_WAIT_TIME_MS = 500;

    @Autowired
    private ITasksProcessingService tasksProcessingService;

    @Autowired
    private ITaskDao taskDao;

    @Autowired
    private TasksProperties tasksProperties;

    @Autowired
    private ITaskHandlerRegistry taskHandlerRegistry;

    @Autowired
    private IExecutorsHelper executorsHelper;

    @Autowired
    private ITopicPartitionsManager topicPartitionsManager;

    @Autowired
    private GlobalProcessingState globalProcessingState;

    @Autowired
    private IBucketsManager bucketsManager;

    @Autowired
    private IErrorLoggingThrottler errorLoggingThrottler;

    @Autowired
    private IMdcService mdcService;

    @Autowired
    private ICoreMetricsTemplate coreMetricsTemplate;

    @Autowired
    private IPartitionKeyStrategy partitionKeyStrategy;
    private ObjectMapper objectMapper;
    private KafkaProducer<String, String> kafkaProducer;
    private ExecutorService executorService;
    private volatile boolean shuttingDown;
    private String triggerTopic;
    private final Map<String, ConsumerBucket> consumerBuckets = new ConcurrentHashMap();
    private final Map<String, ProcessingBucket> processingBuckets = new ConcurrentHashMap();
    private final AtomicInteger pollingBucketsCount = new AtomicInteger();
    private final Lock lifecycleLock = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer$CommittingRebalanceListener.class */
    public class CommittingRebalanceListener implements ConsumerRebalanceListener {
        private ConsumerRebalanceListener delegate;
        private String bucketId;

        private CommittingRebalanceListener(String str, ConsumerRebalanceListener consumerRebalanceListener) {
            this.bucketId = str;
            this.delegate = consumerRebalanceListener;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaTasksExecutionTriggerer.this.commitSyncOnPartitionsRevoked(this.bucketId, collection);
            if (this.delegate != null) {
                this.delegate.onPartitionsRevoked(collection);
            }
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            if (this.delegate != null) {
                this.delegate.onPartitionsAssigned(collection);
            }
        }
    }

    /* loaded from: input_file:com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer$ConsumerBucket.class */
    public static class ConsumerBucket {
        private String bucketId;
        private KafkaConsumer<String, String> kafkaConsumer;
        private AutoCloseable consumerMetricsHandle;
        private int unprocessedFetchedRecordsCount;
        private boolean topicConfigured;
        private long lastCommitTime = System.currentTimeMillis();
        private Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions = new ConcurrentHashMap();
        private Lock offsetsStorageLock = new ReentrantLock();
        private Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommitted = new ConcurrentHashMap();

        public int getOffsetsToBeCommitedCount() {
            return this.offsetsToBeCommitted.size();
        }

        public int getUnprocessedFetchedRecordsCount() {
            return this.unprocessedFetchedRecordsCount;
        }

        public int getOffsetsCount() {
            return this.consumerTopicPartitions.values().stream().mapToInt(consumerTopicPartition -> {
                return consumerTopicPartition.getOffsets().size();
            }).sum();
        }

        public int getOffsetsCompletedCount() {
            return this.consumerTopicPartitions.values().stream().mapToInt(consumerTopicPartition -> {
                return consumerTopicPartition.getOffsetsCompleted().size();
            }).sum();
        }

        public void decrementUnprocessedFetchedRecordsCount() {
            this.unprocessedFetchedRecordsCount--;
        }

        public String getBucketId() {
            return this.bucketId;
        }

        public long getLastCommitTime() {
            return this.lastCommitTime;
        }

        public KafkaConsumer<String, String> getKafkaConsumer() {
            return this.kafkaConsumer;
        }

        public AutoCloseable getConsumerMetricsHandle() {
            return this.consumerMetricsHandle;
        }

        public Map<TopicPartition, ConsumerTopicPartition> getConsumerTopicPartitions() {
            return this.consumerTopicPartitions;
        }

        public Lock getOffsetsStorageLock() {
            return this.offsetsStorageLock;
        }

        public Map<TopicPartition, OffsetAndMetadata> getOffsetsToBeCommitted() {
            return this.offsetsToBeCommitted;
        }

        public boolean isTopicConfigured() {
            return this.topicConfigured;
        }

        public ConsumerBucket setBucketId(String str) {
            this.bucketId = str;
            return this;
        }

        public ConsumerBucket setLastCommitTime(long j) {
            this.lastCommitTime = j;
            return this;
        }

        public ConsumerBucket setKafkaConsumer(KafkaConsumer<String, String> kafkaConsumer) {
            this.kafkaConsumer = kafkaConsumer;
            return this;
        }

        public ConsumerBucket setConsumerMetricsHandle(AutoCloseable autoCloseable) {
            this.consumerMetricsHandle = autoCloseable;
            return this;
        }

        public ConsumerBucket setConsumerTopicPartitions(Map<TopicPartition, ConsumerTopicPartition> map) {
            this.consumerTopicPartitions = map;
            return this;
        }

        public ConsumerBucket setOffsetsStorageLock(Lock lock) {
            this.offsetsStorageLock = lock;
            return this;
        }

        public ConsumerBucket setOffsetsToBeCommitted(Map<TopicPartition, OffsetAndMetadata> map) {
            this.offsetsToBeCommitted = map;
            return this;
        }

        public ConsumerBucket setUnprocessedFetchedRecordsCount(int i) {
            this.unprocessedFetchedRecordsCount = i;
            return this;
        }

        public ConsumerBucket setTopicConfigured(boolean z) {
            this.topicConfigured = z;
            return this;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ConsumerBucket)) {
                return false;
            }
            ConsumerBucket consumerBucket = (ConsumerBucket) obj;
            if (!consumerBucket.canEqual(this) || getLastCommitTime() != consumerBucket.getLastCommitTime() || getUnprocessedFetchedRecordsCount() != consumerBucket.getUnprocessedFetchedRecordsCount() || isTopicConfigured() != consumerBucket.isTopicConfigured()) {
                return false;
            }
            String bucketId = getBucketId();
            String bucketId2 = consumerBucket.getBucketId();
            if (bucketId == null) {
                if (bucketId2 != null) {
                    return false;
                }
            } else if (!bucketId.equals(bucketId2)) {
                return false;
            }
            KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
            KafkaConsumer<String, String> kafkaConsumer2 = consumerBucket.getKafkaConsumer();
            if (kafkaConsumer == null) {
                if (kafkaConsumer2 != null) {
                    return false;
                }
            } else if (!kafkaConsumer.equals(kafkaConsumer2)) {
                return false;
            }
            AutoCloseable consumerMetricsHandle = getConsumerMetricsHandle();
            AutoCloseable consumerMetricsHandle2 = consumerBucket.getConsumerMetricsHandle();
            if (consumerMetricsHandle == null) {
                if (consumerMetricsHandle2 != null) {
                    return false;
                }
            } else if (!consumerMetricsHandle.equals(consumerMetricsHandle2)) {
                return false;
            }
            Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions = getConsumerTopicPartitions();
            Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions2 = consumerBucket.getConsumerTopicPartitions();
            if (consumerTopicPartitions == null) {
                if (consumerTopicPartitions2 != null) {
                    return false;
                }
            } else if (!consumerTopicPartitions.equals(consumerTopicPartitions2)) {
                return false;
            }
            Lock offsetsStorageLock = getOffsetsStorageLock();
            Lock offsetsStorageLock2 = consumerBucket.getOffsetsStorageLock();
            if (offsetsStorageLock == null) {
                if (offsetsStorageLock2 != null) {
                    return false;
                }
            } else if (!offsetsStorageLock.equals(offsetsStorageLock2)) {
                return false;
            }
            Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommitted = getOffsetsToBeCommitted();
            Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommitted2 = consumerBucket.getOffsetsToBeCommitted();
            return offsetsToBeCommitted == null ? offsetsToBeCommitted2 == null : offsetsToBeCommitted.equals(offsetsToBeCommitted2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ConsumerBucket;
        }

        public int hashCode() {
            long lastCommitTime = getLastCommitTime();
            int unprocessedFetchedRecordsCount = (((((1 * 59) + ((int) ((lastCommitTime >>> 32) ^ lastCommitTime))) * 59) + getUnprocessedFetchedRecordsCount()) * 59) + (isTopicConfigured() ? 79 : 97);
            String bucketId = getBucketId();
            int hashCode = (unprocessedFetchedRecordsCount * 59) + (bucketId == null ? 43 : bucketId.hashCode());
            KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
            int hashCode2 = (hashCode * 59) + (kafkaConsumer == null ? 43 : kafkaConsumer.hashCode());
            AutoCloseable consumerMetricsHandle = getConsumerMetricsHandle();
            int hashCode3 = (hashCode2 * 59) + (consumerMetricsHandle == null ? 43 : consumerMetricsHandle.hashCode());
            Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions = getConsumerTopicPartitions();
            int hashCode4 = (hashCode3 * 59) + (consumerTopicPartitions == null ? 43 : consumerTopicPartitions.hashCode());
            Lock offsetsStorageLock = getOffsetsStorageLock();
            int hashCode5 = (hashCode4 * 59) + (offsetsStorageLock == null ? 43 : offsetsStorageLock.hashCode());
            Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommitted = getOffsetsToBeCommitted();
            return (hashCode5 * 59) + (offsetsToBeCommitted == null ? 43 : offsetsToBeCommitted.hashCode());
        }

        public String toString() {
            String bucketId = getBucketId();
            long lastCommitTime = getLastCommitTime();
            KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
            AutoCloseable consumerMetricsHandle = getConsumerMetricsHandle();
            Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions = getConsumerTopicPartitions();
            Lock offsetsStorageLock = getOffsetsStorageLock();
            Map<TopicPartition, OffsetAndMetadata> offsetsToBeCommitted = getOffsetsToBeCommitted();
            int unprocessedFetchedRecordsCount = getUnprocessedFetchedRecordsCount();
            isTopicConfigured();
            return "KafkaTasksExecutionTriggerer.ConsumerBucket(bucketId=" + bucketId + ", lastCommitTime=" + lastCommitTime + ", kafkaConsumer=" + bucketId + ", consumerMetricsHandle=" + kafkaConsumer + ", consumerTopicPartitions=" + consumerMetricsHandle + ", offsetsStorageLock=" + consumerTopicPartitions + ", offsetsToBeCommitted=" + offsetsStorageLock + ", unprocessedFetchedRecordsCount=" + offsetsToBeCommitted + ", topicConfigured=" + unprocessedFetchedRecordsCount + ")";
        }
    }

    /* loaded from: input_file:com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer$ConsumerTopicPartition.class */
    public static class ConsumerTopicPartition {
        private TreeSet<Long> offsets = new TreeSet<>();
        private Map<Long, Boolean> offsetsCompleted = new HashMap();

        public boolean isDone(Long l) {
            Boolean bool = this.offsetsCompleted.get(l);
            return bool != null && bool.booleanValue();
        }

        public TreeSet<Long> getOffsets() {
            return this.offsets;
        }

        public Map<Long, Boolean> getOffsetsCompleted() {
            return this.offsetsCompleted;
        }

        public ConsumerTopicPartition setOffsets(TreeSet<Long> treeSet) {
            this.offsets = treeSet;
            return this;
        }

        public ConsumerTopicPartition setOffsetsCompleted(Map<Long, Boolean> map) {
            this.offsetsCompleted = map;
            return this;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ConsumerTopicPartition)) {
                return false;
            }
            ConsumerTopicPartition consumerTopicPartition = (ConsumerTopicPartition) obj;
            if (!consumerTopicPartition.canEqual(this)) {
                return false;
            }
            TreeSet<Long> offsets = getOffsets();
            TreeSet<Long> offsets2 = consumerTopicPartition.getOffsets();
            if (offsets == null) {
                if (offsets2 != null) {
                    return false;
                }
            } else if (!offsets.equals(offsets2)) {
                return false;
            }
            Map<Long, Boolean> offsetsCompleted = getOffsetsCompleted();
            Map<Long, Boolean> offsetsCompleted2 = consumerTopicPartition.getOffsetsCompleted();
            return offsetsCompleted == null ? offsetsCompleted2 == null : offsetsCompleted.equals(offsetsCompleted2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ConsumerTopicPartition;
        }

        public int hashCode() {
            TreeSet<Long> offsets = getOffsets();
            int hashCode = (1 * 59) + (offsets == null ? 43 : offsets.hashCode());
            Map<Long, Boolean> offsetsCompleted = getOffsetsCompleted();
            return (hashCode * 59) + (offsetsCompleted == null ? 43 : offsetsCompleted.hashCode());
        }

        public String toString() {
            return "KafkaTasksExecutionTriggerer.ConsumerTopicPartition(offsets=" + getOffsets() + ", offsetsCompleted=" + getOffsetsCompleted() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer$ProcessingBucket.class */
    public static class ProcessingBucket {
        private ITasksService.TasksProcessingState state = ITasksService.TasksProcessingState.STOPPED;
        private CompletableFuture<Void> stopFuture;

        public ITasksService.TasksProcessingState getState() {
            return this.state;
        }

        public CompletableFuture<Void> getStopFuture() {
            return this.stopFuture;
        }

        public ProcessingBucket setState(ITasksService.TasksProcessingState tasksProcessingState) {
            this.state = tasksProcessingState;
            return this;
        }

        public ProcessingBucket setStopFuture(CompletableFuture<Void> completableFuture) {
            this.stopFuture = completableFuture;
            return this;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ProcessingBucket)) {
                return false;
            }
            ProcessingBucket processingBucket = (ProcessingBucket) obj;
            if (!processingBucket.canEqual(this)) {
                return false;
            }
            ITasksService.TasksProcessingState state = getState();
            ITasksService.TasksProcessingState state2 = processingBucket.getState();
            if (state == null) {
                if (state2 != null) {
                    return false;
                }
            } else if (!state.equals(state2)) {
                return false;
            }
            CompletableFuture<Void> stopFuture = getStopFuture();
            CompletableFuture<Void> stopFuture2 = processingBucket.getStopFuture();
            return stopFuture == null ? stopFuture2 == null : stopFuture.equals(stopFuture2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ProcessingBucket;
        }

        public int hashCode() {
            ITasksService.TasksProcessingState state = getState();
            int hashCode = (1 * 59) + (state == null ? 43 : state.hashCode());
            CompletableFuture<Void> stopFuture = getStopFuture();
            return (hashCode * 59) + (stopFuture == null ? 43 : stopFuture.hashCode());
        }

        public String toString() {
            return "KafkaTasksExecutionTriggerer.ProcessingBucket(state=" + getState() + ", stopFuture=" + getStopFuture() + ")";
        }
    }

    public void afterPropertiesSet() {
        this.executorService = this.executorsHelper.newCachedExecutor("ktet");
        this.triggerTopic = "twTasks." + this.tasksProperties.getGroupId() + ".executeTask";
        this.tasksProcessingService.addTaskTriggeringFinishedListener(taskTriggering -> {
            if (taskTriggering.isSameProcessTrigger()) {
                return;
            }
            releaseCompletedOffset(this.consumerBuckets.get(taskTriggering.getBucketId()), taskTriggering.getTopicPartition(), taskTriggering.getOffset());
        });
        this.coreMetricsTemplate.registerPollingBucketsCount(this.pollingBucketsCount);
        this.objectMapper = new ObjectMapper();
        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        this.kafkaProducer = createKafkaProducer();
    }

    @Override // com.transferwise.tasks.triggering.ITasksExecutionTriggerer
    public void trigger(BaseTask baseTask) {
        if (this.tasksProperties.isAssertionsEnabled()) {
            Preconditions.checkState(!TransactionSynchronizationManager.isActualTransactionActive());
        }
        ITaskHandler taskHandler = this.taskHandlerRegistry.getTaskHandler(baseTask);
        if (taskHandler == null) {
            log.error("Marking task {} as ERROR, because no task handler was found for type '" + baseTask.getType() + "'.", LogUtils.asParameter(baseTask.getVersionId()));
            this.coreMetricsTemplate.registerTaskMarkedAsError(null, baseTask.getType());
            if (this.taskDao.setStatus(baseTask.getId(), TaskStatus.ERROR, baseTask.getVersion())) {
                return;
            }
            this.coreMetricsTemplate.registerFailedStatusChange(baseTask.getType(), TaskStatus.UNKNOWN.name(), TaskStatus.ERROR);
            log.error("Marking task {} as ERROR failed, version may have changed.", LogUtils.asParameter(baseTask.getVersionId()), new Throwable());
            return;
        }
        String processingBucket = taskHandler.getProcessingPolicy(baseTask).getProcessingBucket(baseTask);
        if (this.bucketsManager.isConfiguredBucket(processingBucket)) {
            if (BooleanUtils.isTrue(this.bucketsManager.getBucketProperties(processingBucket).getTriggerInSameProcess())) {
                if (this.tasksProcessingService.addTaskForProcessing(new TaskTriggering().setTask(baseTask).setBucketId(processingBucket)).getResult() == ITasksProcessingService.AddTaskForProcessingResponse.ResultCode.OK) {
                    return;
                }
            }
            this.kafkaProducer.send(new ProducerRecord(getTopic(processingBucket), getPartitionKeyStrategy(taskHandler, baseTask).createPartitionKey(baseTask), JsonUtils.toJson(this.objectMapper, baseTask)), (recordMetadata, exc) -> {
                if (exc == null) {
                    if (log.isDebugEnabled()) {
                        this.mdcService.with(() -> {
                            this.mdcService.put(baseTask);
                            log.debug("Task '{}' triggering acknowledged by Kafka.", baseTask.getVersionId());
                        });
                    }
                } else if (log.isDebugEnabled() || this.errorLoggingThrottler.canLogError()) {
                    this.mdcService.with(() -> {
                        this.mdcService.put(baseTask);
                        log.error("Task {} triggering failed through Kafka.", LogUtils.asParameter(baseTask.getVersionId()), exc);
                    });
                }
            });
            return;
        }
        log.error("Marking task {} as ERROR, because task handler has unknown bucket '{}'.", LogUtils.asParameter(baseTask.getVersionId()), processingBucket);
        this.coreMetricsTemplate.registerTaskMarkedAsError(processingBucket, baseTask.getType());
        if (this.taskDao.setStatus(baseTask.getId(), TaskStatus.ERROR, baseTask.getVersion())) {
            return;
        }
        this.coreMetricsTemplate.registerFailedStatusChange(baseTask.getType(), TaskStatus.UNKNOWN.name(), TaskStatus.ERROR);
        log.error("Marking task {} as ERROR failed, version may have changed.", LogUtils.asParameter(baseTask.getVersionId()), new Throwable());
    }

    public IPartitionKeyStrategy getPartitionKeyStrategy(ITaskHandler iTaskHandler, BaseTask baseTask) {
        ITaskProcessingPolicy processingPolicy = iTaskHandler.getProcessingPolicy(baseTask);
        return (processingPolicy == null || processingPolicy.getPartitionKeyStrategy() == null) ? this.partitionKeyStrategy : processingPolicy.getPartitionKeyStrategy();
    }

    public ConsumerBucket getConsumerBucket(String str) {
        return (ConsumerBucket) ExceptionUtils.doUnchecked(() -> {
            ConsumerBucket consumerBucket = this.consumerBuckets.get(str);
            if (consumerBucket == null) {
                Map<String, ConsumerBucket> map = this.consumerBuckets;
                ConsumerBucket bucketId = new ConsumerBucket().setBucketId(str);
                consumerBucket = bucketId;
                map.put(str, bucketId);
                ICoreMetricsTemplate iCoreMetricsTemplate = this.coreMetricsTemplate;
                Objects.requireNonNull(consumerBucket);
                iCoreMetricsTemplate.registerKafkaTasksExecutionTriggererOffsetsToBeCommitedCount(str, consumerBucket::getOffsetsToBeCommitedCount);
                ICoreMetricsTemplate iCoreMetricsTemplate2 = this.coreMetricsTemplate;
                Objects.requireNonNull(consumerBucket);
                iCoreMetricsTemplate2.registerKafkaTasksExecutionTriggererOffsetsCompletedCount(str, consumerBucket::getOffsetsCompletedCount);
                ICoreMetricsTemplate iCoreMetricsTemplate3 = this.coreMetricsTemplate;
                Objects.requireNonNull(consumerBucket);
                iCoreMetricsTemplate3.registerKafkaTasksExecutionTriggererUnprocessedFetchedRecordsCount(str, consumerBucket::getUnprocessedFetchedRecordsCount);
                ICoreMetricsTemplate iCoreMetricsTemplate4 = this.coreMetricsTemplate;
                Objects.requireNonNull(consumerBucket);
                iCoreMetricsTemplate4.registerKafkaTasksExecutionTriggererOffsetsCount(str, consumerBucket::getOffsetsCount);
            }
            BucketProperties bucketProperties = this.bucketsManager.getBucketProperties(str);
            if (!consumerBucket.isTopicConfigured()) {
                this.topicPartitionsManager.setPartitionsCount(getTopic(str), bucketProperties.getTriggeringTopicPartitionsCount().intValue());
                consumerBucket.setTopicConfigured(true);
            }
            if (consumerBucket.getKafkaConsumer() == null) {
                Consumer createKafkaConsumer = createKafkaConsumer(str, bucketProperties);
                consumerBucket.setKafkaConsumer(createKafkaConsumer);
                consumerBucket.setConsumerMetricsHandle(this.coreMetricsTemplate.registerKafkaConsumer(createKafkaConsumer));
            }
            return consumerBucket;
        });
    }

    public void poll(String str) {
        log.info("Started to listen tasks triggers in bucket '" + str + "'.");
        try {
            this.pollingBucketsCount.incrementAndGet();
            ConsumerBucket consumerBucket = getConsumerBucket(str);
            GlobalProcessingState.Bucket bucket = this.globalProcessingState.getBuckets().get(str);
            while (!this.shuttingDown && getProcessingBucket(str).getState() == ITasksService.TasksProcessingState.STARTED) {
                try {
                    ConsumerRecords poll = consumerBucket.getKafkaConsumer().poll(this.tasksProperties.getGenericMediumDelay());
                    commitOffsetsWithLowFrequency(consumerBucket);
                    consumerBucket.setUnprocessedFetchedRecordsCount(poll.count());
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                        long offset = consumerRecord.offset();
                        registerPolledOffset(consumerBucket, topicPartition, offset);
                        log.debug("Received Kafka message from topic '{}' partition {} offset {}.", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(offset)});
                        BaseTask baseTask = (BaseTask) JsonUtils.fromJson(this.objectMapper, (String) consumerRecord.value(), BaseTask.class);
                        this.mdcService.with(() -> {
                            this.mdcService.put(baseTask);
                            TaskTriggering topicPartition2 = new TaskTriggering().setTask(baseTask).setBucketId(str).setOffset(offset).setTopicPartition(topicPartition);
                            this.coreMetricsTemplate.registerKafkaTasksExecutionTriggererTriggersReceive(str);
                            log.debug("Adding task '{}' for processing.", baseTask.getVersionId());
                            while (!this.shuttingDown) {
                                long j = bucket.getVersion().get();
                                if (this.tasksProcessingService.addTaskForProcessing(topicPartition2).getResult() != ITasksProcessingService.AddTaskForProcessingResponse.ResultCode.FULL) {
                                    break;
                                }
                                Lock versionLock = bucket.getVersionLock();
                                versionLock.lock();
                                while (bucket.getVersion().get() == j && !this.shuttingDown) {
                                    try {
                                        try {
                                            bucket.getVersionCondition().await(this.tasksProperties.getGenericMediumDelay().toMillis(), TimeUnit.MILLISECONDS);
                                        } catch (InterruptedException e) {
                                            log.error(e.getMessage(), e);
                                        }
                                    } finally {
                                        versionLock.unlock();
                                    }
                                }
                            }
                            consumerBucket.decrementUnprocessedFetchedRecordsCount();
                        });
                    }
                } catch (WakeupException e) {
                }
            }
        } finally {
            this.pollingBucketsCount.decrementAndGet();
            closeKafkaConsumer(this.consumerBuckets.get(str));
        }
    }

    void registerPolledOffset(ConsumerBucket consumerBucket, TopicPartition topicPartition, long j) {
        ConsumerTopicPartition consumerTopicPartition = consumerBucket.getConsumerTopicPartitions().get(topicPartition);
        if (consumerTopicPartition == null) {
            Map<TopicPartition, ConsumerTopicPartition> consumerTopicPartitions = consumerBucket.getConsumerTopicPartitions();
            ConsumerTopicPartition consumerTopicPartition2 = new ConsumerTopicPartition();
            consumerTopicPartition = consumerTopicPartition2;
            consumerTopicPartitions.put(topicPartition, consumerTopicPartition2);
        }
        consumerBucket.getOffsetsStorageLock().lock();
        try {
            consumerTopicPartition.getOffsetsCompleted().remove(Long.valueOf(j));
            consumerTopicPartition.getOffsets().add(Long.valueOf(j));
            consumerBucket.getOffsetsStorageLock().unlock();
        } catch (Throwable th) {
            consumerBucket.getOffsetsStorageLock().unlock();
            throw th;
        }
    }

    void releaseCompletedOffset(ConsumerBucket consumerBucket, TopicPartition topicPartition, long j) {
        consumerBucket.getOffsetsStorageLock().lock();
        try {
            ConsumerTopicPartition consumerTopicPartition = consumerBucket.getConsumerTopicPartitions().get(topicPartition);
            TreeSet<Long> offsets = consumerTopicPartition.getOffsets();
            if (!offsets.contains(Long.valueOf(j))) {
                this.coreMetricsTemplate.registerKafkaTasksExecutionTriggererAlreadyCommitedOffset(consumerBucket.getBucketId());
                log.debug("Offset {} has already been commited.", Long.valueOf(j));
                consumerBucket.getOffsetsStorageLock().unlock();
                return;
            }
            consumerTopicPartition.getOffsetsCompleted().put(Long.valueOf(j), Boolean.TRUE);
            if (offsets.first().longValue() == j) {
                while (!offsets.isEmpty()) {
                    long longValue = offsets.first().longValue();
                    if (!consumerTopicPartition.isDone(Long.valueOf(longValue))) {
                        break;
                    }
                    consumerBucket.getOffsetsToBeCommitted().put(topicPartition, new OffsetAndMetadata(longValue + 1));
                    offsets.pollFirst();
                    consumerTopicPartition.getOffsetsCompleted().remove(Long.valueOf(longValue));
                }
            }
        } finally {
            consumerBucket.getOffsetsStorageLock().unlock();
        }
    }

    private void commitSync(ConsumerBucket consumerBucket, Map<TopicPartition, OffsetAndMetadata> map) {
        if (map.isEmpty()) {
            return;
        }
        String bucketId = consumerBucket.getBucketId();
        try {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Sync-committing bucket '" + bucketId + "' offsets to Kafka: " + ((String) map.entrySet().stream().map(entry -> {
                        return entry.getKey() + ":" + ((OffsetAndMetadata) entry.getValue()).offset();
                    }).collect(Collectors.joining(", "))));
                }
                try {
                    consumerBucket.getKafkaConsumer().commitSync(map);
                } catch (WakeupException e) {
                    consumerBucket.getKafkaConsumer().commitSync(map);
                }
                this.coreMetricsTemplate.registerKafkaTasksExecutionTriggererCommit(bucketId, true, true);
            } catch (Throwable th) {
                registerCommitException(bucketId, th);
                this.coreMetricsTemplate.registerKafkaTasksExecutionTriggererCommit(bucketId, true, false);
            }
        } catch (Throwable th2) {
            this.coreMetricsTemplate.registerKafkaTasksExecutionTriggererCommit(bucketId, true, false);
            throw th2;
        }
    }

    private void commitSyncOnPartitionsRevoked(String str, Collection<TopicPartition> collection) {
        ConsumerBucket consumerBucket = getConsumerBucket(str);
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : collection) {
            OffsetAndMetadata remove = consumerBucket.getOffsetsToBeCommitted().remove(topicPartition);
            if (remove != null) {
                hashMap.put(topicPartition, remove);
            }
        }
        commitSync(consumerBucket, hashMap);
    }

    private void commitOffsetsWithLowFrequency(ConsumerBucket consumerBucket) {
        if (System.currentTimeMillis() - consumerBucket.getLastCommitTime() < this.tasksProperties.getTriggersCommitInterval().toMillis()) {
            return;
        }
        String bucketId = consumerBucket.getBucketId();
        if (consumerBucket.getOffsetsToBeCommitted().isEmpty()) {
            return;
        }
        try {
            if (log.isDebugEnabled()) {
                log.debug("Async-committing bucket '" + bucketId + "' offsets to Kafka: " + ((String) consumerBucket.getOffsetsToBeCommitted().entrySet().stream().map(entry -> {
                    return entry.getKey() + ":" + ((OffsetAndMetadata) entry.getValue()).offset();
                }).collect(Collectors.joining(", "))));
            }
            consumerBucket.getKafkaConsumer().commitAsync(consumerBucket.getOffsetsToBeCommitted(), (map, exc) -> {
                if (exc == null) {
                    this.coreMetricsTemplate.registerKafkaTasksExecutionTriggererCommit(bucketId, false, true);
                } else {
                    this.coreMetricsTemplate.registerKafkaTasksExecutionTriggererCommit(bucketId, false, false);
                    registerCommitException(bucketId, exc);
                }
            });
        } catch (Throwable th) {
            registerCommitException(bucketId, th);
        }
        consumerBucket.getOffsetsToBeCommitted().clear();
        consumerBucket.setLastCommitTime(System.currentTimeMillis());
    }

    protected void registerCommitException(String str, Throwable th) {
        if ((th instanceof RebalanceInProgressException) || (th instanceof ReassignmentInProgressException) || (th instanceof CommitFailedException) || (th instanceof RetriableException)) {
            if (log.isDebugEnabled()) {
                log.debug("Committing Kafka offset failed for bucket '" + str + "'.", th);
            }
        } else if (this.errorLoggingThrottler.canLogError()) {
            log.error("Committing Kafka offset failed for bucket '" + str + "'.", th);
        }
    }

    private KafkaProducer<String, String> createKafkaProducer() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.tasksProperties.getTriggering().getKafka().getBootstrapServers());
        hashMap.put("key.serializer", StringSerializer.class);
        hashMap.put("value.serializer", StringSerializer.class);
        hashMap.put("acks", "all");
        hashMap.put("max.in.flight.requests.per.connection", Integer.valueOf(MAX_KAFKA_PRODUCER_INSTANTIATION_ATTEMPTS));
        hashMap.put("max.block.ms", "5000");
        hashMap.put("enable.idempotence", "true");
        hashMap.put("request.timeout.ms", "5000");
        hashMap.put("delivery.timeout.ms", "10000");
        hashMap.put("linger.ms", "5");
        hashMap.put("client.id", this.tasksProperties.getGroupId() + ".tw-tasks-triggerer");
        hashMap.put("reconnect.backoff.max.ms", "5000");
        hashMap.put("reconnect.backoff.ms", "100");
        hashMap.put("metadata.max.age.ms", "120000");
        hashMap.putAll(this.tasksProperties.getTriggering().getKafka().getProperties());
        KafkaProducer<String, String> kafkaProducer = null;
        int i = 0;
        while (kafkaProducer == null) {
            try {
                i++;
                kafkaProducer = new KafkaProducer<>(hashMap);
            } catch (KafkaException e) {
                if (i >= MAX_KAFKA_PRODUCER_INSTANTIATION_ATTEMPTS) {
                    throw e;
                }
                log.error("Creating Kafka producer failed. Attempt #{}", Integer.valueOf(i), e);
                WaitUtils.sleepQuietly(Duration.ofMillis(500L));
            }
        }
        this.coreMetricsTemplate.registerKafkaProducer(kafkaProducer);
        return kafkaProducer;
    }

    private KafkaConsumer<String, String> createKafkaConsumer(String str, BucketProperties bucketProperties) {
        String groupId = this.tasksProperties.getGroupId();
        if (Boolean.TRUE.equals(bucketProperties.getTriggerSameTaskInAllNodes())) {
            log.info("Using same task triggering on all nodes strategy for bucket '{}'.", str);
            groupId = groupId + "." + this.tasksProperties.getClientId();
        }
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.tasksProperties.getTriggering().getKafka().getBootstrapServers());
        hashMap.put("max.poll.records", bucketProperties.getTriggersFetchSize());
        hashMap.put("group.id", groupId);
        hashMap.put("enable.auto.commit", false);
        hashMap.put("client.id", this.tasksProperties.getClientId() + ".tw-tasks.bucket." + str);
        hashMap.put("key.deserializer", StringDeserializer.class);
        hashMap.put("value.deserializer", StringDeserializer.class);
        hashMap.put("reconnect.backoff.max.ms", "5000");
        hashMap.put("reconnect.backoff.ms", "100");
        hashMap.put("session.timeout.ms", "10000");
        try {
            String version = AppInfoParser.getVersion();
            if (new Semver(version).isGreaterThanOrEqualTo("3.0.0")) {
                hashMap.put("partition.assignment.strategy", CooperativeStickyAssignor.class.getName() + "," + RangeAssignor.class.getName());
            } else {
                log.warn("`kafka-clients:3+` is highly recommended to minimize re-balancing pauses. Current `kafka-clients` version is `{}`.", version);
            }
        } catch (Exception e) {
            log.error("Could not understand Kafka client version.", e);
        }
        hashMap.putAll(this.tasksProperties.getTriggering().getKafka().getProperties());
        if (bucketProperties.getAutoResetOffsetToDuration() != null) {
            hashMap.remove("auto.offset.reset");
        } else {
            hashMap.put("auto.offset.reset", this.tasksProperties.getAutoResetOffsetTo());
        }
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(hashMap);
        List<String> topics = getTopics(str);
        log.info("Subscribing to Kafka topics '{}'", topics);
        if (bucketProperties.getAutoResetOffsetToDuration() == null) {
            kafkaConsumer.subscribe(topics, new CommittingRebalanceListener(str, null));
        } else {
            kafkaConsumer.subscribe(topics, new CommittingRebalanceListener(str, new SeekToDurationOnRebalanceListener(kafkaConsumer, bucketProperties.getAutoResetOffsetToDuration())));
        }
        return kafkaConsumer;
    }

    private void closeKafkaConsumer(ConsumerBucket consumerBucket) {
        if (consumerBucket == null) {
            return;
        }
        AutoCloseable consumerMetricsHandle = consumerBucket.getConsumerMetricsHandle();
        if (consumerMetricsHandle != null) {
            try {
                consumerMetricsHandle.close();
            } catch (Throwable th) {
                log.error("Closing Kafka consumer metrics handle failed.", th);
            }
            consumerBucket.setConsumerMetricsHandle(null);
        }
        KafkaConsumer<String, String> kafkaConsumer = consumerBucket.getKafkaConsumer();
        if (kafkaConsumer == null) {
            return;
        }
        commitOffsetsWithLowFrequency(consumerBucket);
        try {
            kafkaConsumer.unsubscribe();
        } catch (Throwable th2) {
            log.error("Unsubscribing kafka consumer failed.", th2);
        }
        try {
            kafkaConsumer.close();
        } catch (Throwable th3) {
            log.error(th3.getMessage(), th3);
        }
        log.info("Closed Kafka consumer for bucket '" + consumerBucket.getBucketId() + "'.");
        consumerBucket.setKafkaConsumer(null);
    }

    private String getTopic(String str) {
        String str2 = this.triggerTopic;
        if (StringUtils.isNotEmpty(str)) {
            str2 = str2 + "." + str;
        }
        if (StringUtils.isNotEmpty(this.tasksProperties.getKafkaTopicsNamespace())) {
            str2 = this.tasksProperties.getKafkaTopicsNamespace() + "." + str2;
        }
        return str2;
    }

    private List<String> getTopics(String str) {
        ArrayList arrayList = new ArrayList();
        String topic = getTopic(str);
        arrayList.add(topic);
        for (String str2 : StringUtils.split(this.tasksProperties.getKafkaDataCenterPrefixes(), ",")) {
            arrayList.add(str2 + topic);
        }
        return arrayList;
    }

    public void applicationStarted() {
        LockUtils.withLock(this.lifecycleLock, () -> {
            for (String str : this.bucketsManager.getBucketIds()) {
                if (Boolean.TRUE.equals(this.bucketsManager.getBucketProperties(str).getAutoStartProcessing()) && getProcessingBucket(str).getState() == ITasksService.TasksProcessingState.STOPPED) {
                    startBucketProcessing(str);
                }
            }
        });
    }

    private void startBucketProcessing(String str) {
        getProcessingBucket(str).setState(ITasksService.TasksProcessingState.STARTED);
        this.executorService.submit(() -> {
            while (!this.shuttingDown && getProcessingBucket(str).getState() == ITasksService.TasksProcessingState.STARTED) {
                try {
                    poll(str);
                } catch (Throwable th) {
                    log.error(th.getMessage(), th);
                    try {
                        closeKafkaConsumer(this.consumerBuckets.get(str));
                    } catch (Throwable th2) {
                        log.error(th2.getMessage(), th2);
                    }
                    WaitUtils.sleepQuietly(this.tasksProperties.getGenericMediumDelay());
                }
            }
            LockUtils.withLock(this.lifecycleLock, () -> {
                ProcessingBucket processingBucket = getProcessingBucket(str);
                processingBucket.setState(ITasksService.TasksProcessingState.STOPPED);
                if (processingBucket.getStopFuture() != null) {
                    processingBucket.getStopFuture().complete(null);
                }
                processingBucket.setStopFuture(null);
                log.info("Stopped triggers processing for bucket '" + str + "'.");
            });
        });
        log.info("Started triggers processing for bucket '" + str + "'.");
    }

    @Override // com.transferwise.tasks.triggering.ITasksExecutionTriggerer
    public void startTasksProcessing(String str) {
        String str2 = str == null ? IBucketsManager.DEFAULT_ID : str;
        LockUtils.withLock(this.lifecycleLock, () -> {
            if (getProcessingBucket(str2).getState() == ITasksService.TasksProcessingState.STOPPED) {
                startBucketProcessing(str2);
            }
        });
    }

    @Override // com.transferwise.tasks.triggering.ITasksExecutionTriggerer
    public Future<Void> stopTasksProcessing(String str) {
        return (Future) LockUtils.withLock(this.lifecycleLock, () -> {
            KafkaConsumer<String, String> kafkaConsumer;
            ProcessingBucket processingBucket = getProcessingBucket(str);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            if (processingBucket.getState() != ITasksService.TasksProcessingState.STARTED) {
                completableFuture.complete(null);
                return completableFuture;
            }
            processingBucket.setStopFuture(completableFuture);
            processingBucket.setState(ITasksService.TasksProcessingState.STOP_IN_PROGRESS);
            ConsumerBucket consumerBucket = this.consumerBuckets.get(str);
            if (consumerBucket != null && (kafkaConsumer = consumerBucket.getKafkaConsumer()) != null) {
                kafkaConsumer.wakeup();
            }
            return completableFuture;
        });
    }

    @Override // com.transferwise.tasks.triggering.ITasksExecutionTriggerer
    public ITasksService.TasksProcessingState getTasksProcessingState(String str) {
        return getProcessingBucket(str).getState();
    }

    public void prepareForShutdown() {
        this.shuttingDown = true;
        Iterator<String> it = this.bucketsManager.getBucketIds().iterator();
        while (it.hasNext()) {
            stopTasksProcessing(it.next());
        }
        this.executorService.shutdown();
    }

    public boolean canShutdown() {
        return this.executorService.isTerminated();
    }

    private ProcessingBucket getProcessingBucket(String str) {
        return this.processingBuckets.computeIfAbsent(str == null ? IBucketsManager.DEFAULT_ID : str, str2 -> {
            return new ProcessingBucket();
        });
    }
}
