package com.transferwise.tasks.impl.tokafka.test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.impl.tokafka.ToKafkaMessages;
import com.transferwise.tasks.impl.tokafka.ToKafkaTaskType;
import com.transferwise.tasks.test.ITestTasksService;
import com.transferwise.tasks.test.TaskTrackerHandler;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;

/* loaded from: input_file:com/transferwise/tasks/impl/tokafka/test/ToKafkaTestHelper.class */
public class ToKafkaTestHelper implements IToKafkaTestHelper {
    private static final Logger log = LoggerFactory.getLogger(ToKafkaTestHelper.class);

    @Autowired
    private ITestTasksService testTasksService;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    /* loaded from: input_file:com/transferwise/tasks/impl/tokafka/test/ToKafkaTestHelper$SendKafkaEventHandler.class */
    public static class SendKafkaEventHandler implements AutoCloseable {
        private final TaskTrackerHandler interceptAddTasks;
        private final ObjectMapper objectMapper;
        private final ITestTasksService testTasksService;

        public Stream<ToKafkaMessages.Message> getSentKafkaMessages() {
            return this.interceptAddTasks.getRequests().stream().map(addTaskRequest -> {
                return addTaskRequest.getData() != null ? (ToKafkaMessages) addTaskRequest.getData() : (ToKafkaMessages) ExceptionUtils.doUnchecked(() -> {
                    return (ToKafkaMessages) this.objectMapper.readValue(addTaskRequest.getDataString(), ToKafkaMessages.class);
                });
            }).flatMap(toKafkaMessages -> {
                return toKafkaMessages.getMessages().stream();
            });
        }

        public Stream<ToKafkaMessages.Message> getSentKafkaMessages(String str) {
            return this.interceptAddTasks.getRequests().stream().filter(addTaskRequest -> {
                return str.equals(addTaskRequest.getSubType());
            }).map(addTaskRequest2 -> {
                return addTaskRequest2.getData() != null ? (ToKafkaMessages) addTaskRequest2.getData() : (ToKafkaMessages) ExceptionUtils.doUnchecked(() -> {
                    return (ToKafkaMessages) this.objectMapper.readValue(addTaskRequest2.getDataString(), ToKafkaMessages.class);
                });
            }).flatMap(toKafkaMessages -> {
                return toKafkaMessages.getMessages().stream();
            });
        }

        public <T> Stream<T> getSentKafkaMessages(String str, Class<T> cls) {
            return (Stream<T>) getSentKafkaMessages(str).map(message -> {
                return ExceptionUtils.doUnchecked(() -> {
                    return this.objectMapper.readValue(message.getMessage(), cls);
                });
            });
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.testTasksService.stopTracking(this.interceptAddTasks);
        }

        public SendKafkaEventHandler(TaskTrackerHandler taskTrackerHandler, ObjectMapper objectMapper, ITestTasksService iTestTasksService) {
            this.interceptAddTasks = taskTrackerHandler;
            this.objectMapper = objectMapper;
            this.testTasksService = iTestTasksService;
        }
    }

    @Override // com.transferwise.tasks.impl.tokafka.test.IToKafkaTestHelper
    public <T> List<T> getSentKafkaMessages(String str, Class<T> cls) {
        return (List) ExceptionUtils.doUnchecked(() -> {
            ArrayList arrayList = new ArrayList();
            List<Task> finishedTasks = this.testTasksService.getFinishedTasks(ToKafkaTaskType.VALUE, str);
            if (finishedTasks.isEmpty()) {
                return arrayList;
            }
            for (Task task : finishedTasks) {
                if (TaskStatus.DONE.name().equals(task.getStatus())) {
                    Iterator<ToKafkaMessages.Message> it = ((ToKafkaMessages) this.objectMapper.readValue(task.getData(), ToKafkaMessages.class)).getMessages().iterator();
                    while (it.hasNext()) {
                        arrayList.add(this.objectMapper.readValue(it.next().getMessage(), cls));
                    }
                }
            }
            return arrayList;
        });
    }

    @Override // com.transferwise.tasks.impl.tokafka.test.IToKafkaTestHelper
    public SendKafkaEventHandler trackKafkaMessagesEvents() {
        return new SendKafkaEventHandler(this.testTasksService.startTrackingAddTasks(addTaskRequest -> {
            return ToKafkaTaskType.VALUE.equals(addTaskRequest.getType());
        }), this.objectMapper, this.testTasksService);
    }

    @Override // com.transferwise.tasks.impl.tokafka.test.IToKafkaTestHelper
    public void sendDirectKafkaMessage(String str, Object obj) {
        sendDirectKafkaMessage(str, null, null, obj);
    }

    @Override // com.transferwise.tasks.impl.tokafka.test.IToKafkaTestHelper
    public void sendDirectKafkaMessage(String str, Long l, String str2, Object obj) {
        sendDirectKafkaMessage(new ProducerRecord<>(str, (Integer) null, l, str2, obj instanceof String ? (String) obj : (String) ExceptionUtils.doUnchecked(() -> {
            return this.objectMapper.writeValueAsString(obj);
        })));
    }

    @Override // com.transferwise.tasks.impl.tokafka.test.IToKafkaTestHelper
    public void sendDirectKafkaMessage(ProducerRecord<String, String> producerRecord) {
        this.kafkaTemplate.send(producerRecord).addCallback(sendResult -> {
            log.debug("Sent and acked Kafka message to topic '{}'.", producerRecord.topic());
        }, th -> {
            log.error("Sending message to Kafka topic '{}'.", producerRecord.topic(), th);
        });
    }

    @Override // com.transferwise.tasks.impl.tokafka.test.IToKafkaTestHelper
    public void cleanFinishedTasks(String str) {
        this.testTasksService.cleanFinishedTasks(ToKafkaTaskType.VALUE, str);
    }
}
