package com.transferwise.tasks.helpers.kafka.messagetotask;

import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy;
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.helpers.IErrorLoggingThrottler;
import com.transferwise.tasks.helpers.IMeterHelper;
import com.transferwise.tasks.helpers.executors.IExecutorsHelper;
import com.transferwise.tasks.helpers.kafka.ConsistentKafkaConsumer;
import com.transferwise.tasks.helpers.kafka.ITopicPartitionsManager;
import com.transferwise.tasks.helpers.kafka.messagetotask.IKafkaMessageHandler;
import com.transferwise.tasks.utils.WaitUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

/* loaded from: input_file:com/transferwise/tasks/helpers/kafka/messagetotask/CoreKafkaListener.class */
public class CoreKafkaListener<T> implements GracefulShutdownStrategy {
    private static final Logger log = LoggerFactory.getLogger(CoreKafkaListener.class);

    @Autowired
    private KafkaProperties kafkaProperties;

    @Autowired
    private IExecutorsHelper executorsHelper;

    @Autowired
    private TasksProperties tasksProperties;

    @Autowired
    private IKafkaMessageHandlerRegistry<T> kafkaMessageHandlerRegistry;

    @Autowired
    private ITopicPartitionsManager topicPartitionsManager;

    @Autowired
    private IErrorLoggingThrottler errorLoggingThrottler;

    @Autowired
    private IMeterHelper meterHelper;
    private ExecutorService executorService;
    private boolean shuttingDown;
    private List<MyTopic> topics = new ArrayList();
    private List<String> kafkaDataCenterPrefixes;

    /* loaded from: input_file:com/transferwise/tasks/helpers/kafka/messagetotask/CoreKafkaListener$MyTopic.class */
    private static class MyTopic {
        private String address;
        private boolean configured;
        private Integer partitionsCount;

        public String getAddress() {
            return this.address;
        }

        public boolean isConfigured() {
            return this.configured;
        }

        public Integer getPartitionsCount() {
            return this.partitionsCount;
        }

        public MyTopic setAddress(String str) {
            this.address = str;
            return this;
        }

        public MyTopic setConfigured(boolean z) {
            this.configured = z;
            return this;
        }

        public MyTopic setPartitionsCount(Integer num) {
            this.partitionsCount = num;
            return this;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof MyTopic)) {
                return false;
            }
            MyTopic myTopic = (MyTopic) obj;
            if (!myTopic.canEqual(this)) {
                return false;
            }
            String address = getAddress();
            String address2 = myTopic.getAddress();
            if (address == null) {
                if (address2 != null) {
                    return false;
                }
            } else if (!address.equals(address2)) {
                return false;
            }
            if (isConfigured() != myTopic.isConfigured()) {
                return false;
            }
            Integer partitionsCount = getPartitionsCount();
            Integer partitionsCount2 = myTopic.getPartitionsCount();
            return partitionsCount == null ? partitionsCount2 == null : partitionsCount.equals(partitionsCount2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof MyTopic;
        }

        public int hashCode() {
            String address = getAddress();
            int hashCode = (((1 * 59) + (address == null ? 43 : address.hashCode())) * 59) + (isConfigured() ? 79 : 97);
            Integer partitionsCount = getPartitionsCount();
            return (hashCode * 59) + (partitionsCount == null ? 43 : partitionsCount.hashCode());
        }

        public String toString() {
            return "CoreKafkaListener.MyTopic(address=" + getAddress() + ", configured=" + isConfigured() + ", partitionsCount=" + getPartitionsCount() + ")";
        }
    }

    @PostConstruct
    public void init() {
        if (this.kafkaMessageHandlerRegistry.isEmpty()) {
            return;
        }
        this.kafkaDataCenterPrefixes = Arrays.asList(StringUtils.split(this.tasksProperties.getKafkaDataCenterPrefixes(), ","));
        Iterator<IKafkaMessageHandler<T>> it = this.kafkaMessageHandlerRegistry.getKafkaMessageHandlers().iterator();
        while (it.hasNext()) {
            for (IKafkaMessageHandler.Topic topic : it.next().getTopics()) {
                this.topics.add(new MyTopic().setAddress(topic.getAddress()).setPartitionsCount(topic.getSuggestedPartitionsCount()));
            }
        }
    }

    public void poll(List<String> list) {
        Map buildConsumerProperties = this.kafkaProperties.buildConsumerProperties();
        buildConsumerProperties.put("client.id", buildConsumerProperties.getOrDefault("client.id", "") + ".tw-tasks.core-listener");
        new ConsistentKafkaConsumer().setTopics(list).setDelayTimeout(this.tasksProperties.getGenericMediumDelay()).setShouldPollPredicate(() -> {
            return true;
        }).setShouldFinishPredicate(() -> {
            return Boolean.valueOf(this.shuttingDown);
        }).setKafkaPropertiesSupplier(() -> {
            return buildConsumerProperties;
        }).setRecordConsumer(consumerRecord -> {
            this.kafkaMessageHandlerRegistry.getForTopicOrFail(removeTopicPrefixes(consumerRecord.topic())).forEach(iKafkaMessageHandler -> {
                iKafkaMessageHandler.handle(consumerRecord);
            });
            this.meterHelper.registerKafkaCoreMessageProcessing(consumerRecord.topic());
        }).setErrorLoggingThrottler(this.errorLoggingThrottler).setMeterHelper(this.meterHelper).consume();
    }

    protected void addAddresses(List<String> list, String str) {
        list.add(getNamespacedTopic(str));
        Iterator<String> it = this.kafkaDataCenterPrefixes.iterator();
        while (it.hasNext()) {
            list.add(getNamespacedTopic(it.next() + str));
        }
    }

    private String getNamespacedTopic(String str) {
        return StringUtils.isNotEmpty(this.tasksProperties.getKafkaTopicsNamespace()) ? this.tasksProperties.getKafkaTopicsNamespace() + "." + str : str;
    }

    protected String removeTopicPrefixes(String str) {
        if (StringUtils.isNotEmpty(this.tasksProperties.getKafkaTopicsNamespace())) {
            String str2 = this.tasksProperties.getKafkaTopicsNamespace() + ".";
            if (str.startsWith(str2)) {
                str = StringUtils.substringAfter(str, str2);
            }
        }
        for (String str3 : this.kafkaDataCenterPrefixes) {
            if (str.startsWith(str3)) {
                str = StringUtils.substringAfter(str, str3);
            }
        }
        return str;
    }

    public void applicationStarted() {
        if (this.kafkaMessageHandlerRegistry.isEmpty()) {
            return;
        }
        this.executorService = this.executorsHelper.newCachedExecutor("core-kafka-listener");
        this.executorService.submit(() -> {
            if (this.tasksProperties.isCoreKafkaListenerTopicsConfiguringEnabled()) {
                for (MyTopic myTopic : this.topics) {
                    if (!myTopic.isConfigured() && myTopic.getPartitionsCount() != null) {
                        this.topicPartitionsManager.setPartitionsCount(getNamespacedTopic(myTopic.getAddress()), myTopic.getPartitionsCount().intValue());
                        myTopic.setConfigured(true);
                    }
                }
            }
            ArrayList arrayList = new ArrayList();
            Iterator<MyTopic> it = this.topics.iterator();
            while (it.hasNext()) {
                addAddresses(arrayList, it.next().getAddress());
            }
            arrayList.forEach(str -> {
                log.info("Listening topic '" + str + "'.");
            });
            while (!this.shuttingDown) {
                try {
                    poll(arrayList);
                } catch (Throwable th) {
                    log.error(th.getMessage(), th);
                    WaitUtils.sleepQuietly(this.tasksProperties.getGenericMediumDelay());
                }
            }
        });
    }

    public void prepareForShutdown() {
        this.shuttingDown = true;
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    public boolean canShutdown() {
        return this.executorService == null || this.executorService.isTerminated();
    }
}
