package com.transferwise.tasks.helpers.kafka;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.transferwise.tasks.helpers.IErrorLoggingThrottler;
import com.transferwise.tasks.helpers.IMeterHelper;
import com.transferwise.tasks.utils.WaitUtils;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/transferwise/tasks/helpers/kafka/ConsistentKafkaConsumer.class */
public class ConsistentKafkaConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(ConsistentKafkaConsumer.class);
    private List<String> topics;
    private Supplier<Map<String, Object>> kafkaPropertiesSupplier;
    private Supplier<Boolean> shouldFinishPredicate;
    private Supplier<Boolean> shouldPollPredicate;
    private Consumer<ConsumerRecord<String, T>> recordConsumer;
    private Duration delayTimeout = Duration.ofSeconds(5);
    private IMeterHelper meterHelper;
    private IErrorLoggingThrottler errorLoggingThrottler;

    public void consume() {
        MutableObject<org.apache.kafka.clients.consumer.Consumer<String, T>> mutableObject = new MutableObject<>();
        while (!this.shouldFinishPredicate.get().booleanValue()) {
            try {
                try {
                    if (mutableObject.getValue() == null) {
                        HashMap hashMap = new HashMap(this.kafkaPropertiesSupplier.get());
                        hashMap.put("enable.auto.commit", false);
                        mutableObject.setValue(new KafkaConsumer(hashMap));
                        ((org.apache.kafka.clients.consumer.Consumer) mutableObject.getValue()).subscribe(this.topics);
                    }
                    if (this.shouldPollPredicate.get().booleanValue()) {
                        log.debug("Polling from Kafka.");
                        handlePolledKafkaMessages(mutableObject, ((org.apache.kafka.clients.consumer.Consumer) mutableObject.getValue()).poll(this.delayTimeout));
                    }
                } catch (Throwable th) {
                    log.error(th.getMessage(), th);
                    WaitUtils.sleepQuietly(this.delayTimeout);
                }
            } finally {
                close(mutableObject);
            }
        }
    }

    @Trace(dispatcher = true)
    protected void handlePolledKafkaMessages(MutableObject<org.apache.kafka.clients.consumer.Consumer<String, T>> mutableObject, ConsumerRecords<String, T> consumerRecords) {
        NewRelic.setTransactionName("TwTasksEngine", "HandlePolledKafkaMessages");
        if (log.isDebugEnabled()) {
            log.debug("Polled " + consumerRecords.count() + " messages from Kafka.");
        }
        if (consumerRecords.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("No records found for: " + StringUtils.join(((org.apache.kafka.clients.consumer.Consumer) mutableObject.getValue()).subscription(), ", "));
                return;
            }
            return;
        }
        HashMap hashMap = new HashMap();
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<String, T> consumerRecord = (ConsumerRecord) it.next();
            if (log.isDebugEnabled()) {
                log.debug("Received Kafka message from topic '{}', partition {}, offset {}.", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
            }
            TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            while (!this.shouldFinishPredicate.get().booleanValue()) {
                try {
                    this.recordConsumer.accept(consumerRecord);
                    hashMap.put(topicPartition, Long.valueOf(consumerRecord.offset() + 1));
                    break;
                } catch (Throwable th) {
                    log.error("Accepting Kafka message from topic '" + consumerRecord.topic() + "', partition " + consumerRecord.partition() + ", offset " + consumerRecord.offset() + " failed.", th);
                    WaitUtils.sleepQuietly(this.delayTimeout);
                }
            }
        }
        if (hashMap.isEmpty()) {
            return;
        }
        Map map = (Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new OffsetAndMetadata(((Long) entry.getValue()).longValue());
        }));
        if (!this.shouldFinishPredicate.get().booleanValue()) {
            ((org.apache.kafka.clients.consumer.Consumer) mutableObject.getValue()).commitAsync(map, (map2, exc) -> {
                if (exc == null) {
                    log.debug("Committing offsets completed.");
                } else {
                    registerCommitException(exc);
                }
            });
            return;
        }
        try {
            ((org.apache.kafka.clients.consumer.Consumer) mutableObject.getValue()).commitSync(map);
        } catch (Throwable th2) {
            registerCommitException(th2);
        }
    }

    protected void registerCommitException(Throwable th) {
        if ((th instanceof CommitFailedException) || (th instanceof RetriableException)) {
            if (this.meterHelper != null) {
                this.meterHelper.incrementCounter("twTasks.consistentKafkaConsumer.failedCommitsCount", 1L);
            }
            log.debug("Committing Kafka offset failed.", th);
        } else if (this.errorLoggingThrottler == null || this.errorLoggingThrottler.canLogError()) {
            log.error("Committing Kafka offset failed.", th);
        }
    }

    private void close(MutableObject<org.apache.kafka.clients.consumer.Consumer<String, T>> mutableObject) {
        if (mutableObject == null || mutableObject.getValue() == null) {
            return;
        }
        try {
            log.info("Unsubscribing from topic '" + StringUtils.join(((org.apache.kafka.clients.consumer.Consumer) mutableObject.getValue()).subscription(), ",'") + "'.");
            ((org.apache.kafka.clients.consumer.Consumer) mutableObject.getValue()).unsubscribe();
        } catch (Throwable th) {
            log.error(th.getMessage(), th);
        }
        try {
            ((org.apache.kafka.clients.consumer.Consumer) mutableObject.getValue()).close();
        } catch (Throwable th2) {
            log.error(th2.getMessage(), th2);
        }
        mutableObject.setValue((Object) null);
    }

    public ConsistentKafkaConsumer<T> setTopics(List<String> list) {
        this.topics = list;
        return this;
    }

    public ConsistentKafkaConsumer<T> setKafkaPropertiesSupplier(Supplier<Map<String, Object>> supplier) {
        this.kafkaPropertiesSupplier = supplier;
        return this;
    }

    public ConsistentKafkaConsumer<T> setShouldFinishPredicate(Supplier<Boolean> supplier) {
        this.shouldFinishPredicate = supplier;
        return this;
    }

    public ConsistentKafkaConsumer<T> setShouldPollPredicate(Supplier<Boolean> supplier) {
        this.shouldPollPredicate = supplier;
        return this;
    }

    public ConsistentKafkaConsumer<T> setRecordConsumer(Consumer<ConsumerRecord<String, T>> consumer) {
        this.recordConsumer = consumer;
        return this;
    }

    public ConsistentKafkaConsumer<T> setDelayTimeout(Duration duration) {
        this.delayTimeout = duration;
        return this;
    }

    public ConsistentKafkaConsumer<T> setMeterHelper(IMeterHelper iMeterHelper) {
        this.meterHelper = iMeterHelper;
        return this;
    }

    public ConsistentKafkaConsumer<T> setErrorLoggingThrottler(IErrorLoggingThrottler iErrorLoggingThrottler) {
        this.errorLoggingThrottler = iErrorLoggingThrottler;
        return this;
    }
}
