package com.transferwise.tasks.impl.tokafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.tasks.handler.ExponentialTaskRetryPolicy;
import com.transferwise.tasks.handler.SimpleTaskConcurrencyPolicy;
import com.transferwise.tasks.handler.SimpleTaskProcessingPolicy;
import com.transferwise.tasks.handler.TaskHandlerAdapter;
import com.transferwise.tasks.handler.interfaces.ITaskHandler;
import com.transferwise.tasks.impl.tokafka.ToKafkaMessages;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;

@Configuration
/* loaded from: input_file:com/transferwise/tasks/impl/tokafka/ToKafkaTaskHandlerConfiguration.class */
public class ToKafkaTaskHandlerConfiguration {
    private static final Logger log = LoggerFactory.getLogger(ToKafkaTaskHandlerConfiguration.class);

    @Autowired(required = false)
    private MeterRegistry meterRegistry;

    @Bean
    public ITaskHandler toKafkaTaskHandler(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper, ToKafkaProperties toKafkaProperties) {
        return new TaskHandlerAdapter(iBaseTask -> {
            return ToKafkaTaskType.VALUE.equals(iBaseTask.getType());
        }, (iTask, runnable, consumer) -> {
            ExceptionUtils.doUnchecked(() -> {
                ToKafkaMessages toKafkaMessages = (ToKafkaMessages) objectMapper.readValue(iTask.getData(), ToKafkaMessages.class);
                AtomicInteger atomicInteger = new AtomicInteger(toKafkaMessages.getMessages().size());
                String topic = toKafkaMessages.getTopic();
                for (ToKafkaMessages.Message message : toKafkaMessages.getMessages()) {
                    kafkaTemplate.send(topic, message.getKey(), message.getMessage()).addCallback(sendResult -> {
                        log.debug("Sent and acked Kafka message to topic '{}'.", topic);
                        registerSentMessage(topic);
                        if (atomicInteger.decrementAndGet() == 0) {
                            runnable.run();
                        }
                    }, th -> {
                        log.error("Sending message to Kafka topic '" + topic + "'.", th);
                        consumer.accept(th);
                    });
                }
            });
        }).setConcurrencyPolicy(new SimpleTaskConcurrencyPolicy(toKafkaProperties.getMaxConcurrency())).setProcessingPolicy(new SimpleTaskProcessingPolicy().setMaxProcessingDuration(Duration.ofMillis(toKafkaProperties.getMaxProcessingDurationMs()))).setRetryPolicy(new ExponentialTaskRetryPolicy().setDelay(Duration.ofMillis(toKafkaProperties.getRetryDelayMs())).setMultiplier(toKafkaProperties.getRetryExponent()).setMaxCount(toKafkaProperties.getRetryMaxCount()).setMaxDelay(Duration.ofMillis(toKafkaProperties.getRetryMaxDelayMs())));
    }

    private void registerSentMessage(String str) {
        if (this.meterRegistry != null) {
            this.meterRegistry.counter("twTasks.toKafka.sentMessagesCount", Tags.of("topic", str));
        }
    }
}
