package com.transferwise.tasks.impl.tokafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.common.baseutils.tracing.IWithXRequestId;
import com.transferwise.common.baseutils.tracing.IXRequestIdHolder;
import com.transferwise.tasks.ITasksService;
import com.transferwise.tasks.impl.tokafka.IToKafkaSenderService;
import com.transferwise.tasks.impl.tokafka.ToKafkaMessages;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;

/* loaded from: input_file:com/transferwise/tasks/impl/tokafka/ToKafkaSenderService.class */
public class ToKafkaSenderService implements IToKafkaSenderService {
    private static final Logger log = LoggerFactory.getLogger(ToKafkaSenderService.class);
    private static final int MEGABYTE_MULTIPLIER = 1000000;
    private static final double KILOBYTE_DENOM = 1024.0d;
    private final ObjectMapper objectMapper;
    private final ITasksService taskService;
    private final int batchSizeMb;
    private final IXRequestIdHolder xRequestIdHolder;

    @Override // com.transferwise.tasks.impl.tokafka.IToKafkaSenderService
    @Transactional(rollbackFor = {Exception.class})
    public void sendMessage(IToKafkaSenderService.SendMessageRequest sendMessageRequest) {
        ToKafkaMessages add = new ToKafkaMessages().setTopic(sendMessageRequest.getTopic()).add(convert(sendMessageRequest));
        this.taskService.addTask(new ITasksService.AddTaskRequest().setType(ToKafkaTaskType.VALUE).setSubType(add.getTopic()).setData(add).setRunAfterTime(sendMessageRequest.getSendAfterTime()));
    }

    @Override // com.transferwise.tasks.impl.tokafka.IToKafkaSenderService
    @Transactional(rollbackFor = {Exception.class})
    public void sendMessages(IToKafkaSenderService.SendMessagesRequest sendMessagesRequest) {
        splitToBatches((List) sendMessagesRequest.getMessages().stream().map(this::convert).collect(Collectors.toList()), this.batchSizeMb * MEGABYTE_MULTIPLIER).forEach(list -> {
            sendBatch(sendMessagesRequest, list);
        });
    }

    protected ToKafkaMessages.Message convert(IToKafkaSenderService.SendMessageRequest sendMessageRequest) {
        return toKafkaMessage(sendMessageRequest.getKey(), sendMessageRequest.getPayloadString(), sendMessageRequest.getPayload());
    }

    protected ToKafkaMessages.Message convert(IToKafkaSenderService.SendMessagesRequest.Message message) {
        return toKafkaMessage(String.valueOf(message.getKey()), message.getPayloadString(), message.getPayload());
    }

    protected List<List<ToKafkaMessages.Message>> splitToBatches(List<ToKafkaMessages.Message> list, int i) {
        if (list.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        ArrayList arrayList2 = new ArrayList();
        ToKafkaMessages.Message message = list.get(0);
        int approxSize = 0 + message.getApproxSize();
        arrayList2.add(message);
        for (ToKafkaMessages.Message message2 : list.subList(1, list.size())) {
            int approxSize2 = message2.getApproxSize();
            if (approxSize + approxSize2 > i) {
                arrayList.add(arrayList2);
                i2++;
                if (log.isDebugEnabled()) {
                    log.debug("Big bunch of messages was received in one go. Splitting it to batches. {}-th batch size is {} messages, {} KiB", new Object[]{Integer.valueOf(i2), Integer.valueOf(arrayList2.size()), Double.valueOf(approxSize / KILOBYTE_DENOM)});
                }
                arrayList2 = new ArrayList();
                approxSize = 0;
            }
            approxSize += approxSize2;
            arrayList2.add(message2);
        }
        if (i2 != 0 && log.isDebugEnabled()) {
            log.debug("Big bunch of messages was received in one go. Splitting it to batches. {}-th batch size is {} messages, {} KiB", new Object[]{Integer.valueOf(i2 + 1), Integer.valueOf(arrayList2.size()), Double.valueOf(approxSize / KILOBYTE_DENOM)});
        }
        arrayList.add(arrayList2);
        return arrayList;
    }

    private void sendBatch(IToKafkaSenderService.SendMessagesRequest sendMessagesRequest, List<ToKafkaMessages.Message> list) {
        ToKafkaMessages messages = new ToKafkaMessages().setTopic(sendMessagesRequest.getTopic()).setMessages(list);
        this.taskService.addTask(new ITasksService.AddTaskRequest().setType(ToKafkaTaskType.VALUE).setSubType(messages.getTopic()).setData(messages).setRunAfterTime(sendMessagesRequest.getSendAfterTime()));
    }

    private void enrichXRequestId(Object obj) {
        if (this.xRequestIdHolder == null || !(obj instanceof IWithXRequestId)) {
            return;
        }
        IWithXRequestId iWithXRequestId = (IWithXRequestId) obj;
        if (iWithXRequestId.getXRequestId() == null) {
            iWithXRequestId.setXRequestId(this.xRequestIdHolder.current());
        }
    }

    private ToKafkaMessages.Message toKafkaMessage(String str, String str2, Object obj) {
        ToKafkaMessages.Message key = new ToKafkaMessages.Message().setKey(str);
        if (str2 != null) {
            key.setMessage(str2);
        } else {
            enrichXRequestId(obj);
            key.setMessage((String) ExceptionUtils.doUnchecked(() -> {
                return this.objectMapper.writeValueAsString(obj);
            }));
        }
        return key;
    }

    public ToKafkaSenderService(ObjectMapper objectMapper, ITasksService iTasksService, int i, IXRequestIdHolder iXRequestIdHolder) {
        this.objectMapper = objectMapper;
        this.taskService = iTasksService;
        this.batchSizeMb = i;
        this.xRequestIdHolder = iXRequestIdHolder;
    }
}
