package com.brihaspathee.zeus.broker.consumer;

import com.brihaspathee.zeus.broker.message.AccountProcessingRequest;
import com.brihaspathee.zeus.broker.producer.AccountProcessingResponseProducer;
import com.brihaspathee.zeus.constants.ZeusServiceNames;
import com.brihaspathee.zeus.domain.entity.PayloadTracker;
import com.brihaspathee.zeus.domain.entity.PayloadTrackerDetail;
import com.brihaspathee.zeus.helper.interfaces.PayloadTrackerDetailHelper;
import com.brihaspathee.zeus.helper.interfaces.PayloadTrackerHelper;
import com.brihaspathee.zeus.message.Acknowledgement;
import com.brihaspathee.zeus.message.MessageMetadata;
import com.brihaspathee.zeus.message.ZeusMessagePayload;
import com.brihaspathee.zeus.service.interfaces.TransactionProcessor;
import com.brihaspathee.zeus.util.ZeusRandomStringGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.LocalDateTime;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:BOOT-INF/classes/com/brihaspathee/zeus/broker/consumer/AccountProcessingListener.class */
public class AccountProcessingListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AccountProcessingListener.class);
    private final ObjectMapper objectMapper;
    private final PayloadTrackerHelper payloadTrackerHelper;
    private final PayloadTrackerDetailHelper payloadTrackerDetailHelper;
    private final TransactionProcessor transactionProcessor;
    private final AccountProcessingResponseProducer accountProcessingResponseProducer;

    @KafkaListener(topics = {"ZEUS.ACCOUNT.PROCESSING.REQ"})
    @SendTo({"ZEUS.ACCOUNT.PROCESSING.ACK"})
    public ZeusMessagePayload<Acknowledgement> listenForTransactionRequest(ConsumerRecord<String, ZeusMessagePayload<AccountProcessingRequest>> consumerRecord) throws JsonProcessingException {
        log.info("Transaction request received ");
        ZeusMessagePayload<AccountProcessingRequest> zeusMessagePayload = (ZeusMessagePayload) this.objectMapper.readValue(this.objectMapper.writeValueAsString(consumerRecord.value()), new TypeReference<ZeusMessagePayload<AccountProcessingRequest>>(this) { // from class: com.brihaspathee.zeus.broker.consumer.AccountProcessingListener.1
        });
        log.info("Request received for account {}", zeusMessagePayload.getPayload().getAccountNumber());
        log.info("Transaction Details {}", zeusMessagePayload.getPayload().getTransactionDto());
        PayloadTracker createPayloadTracker = createPayloadTracker(zeusMessagePayload);
        log.info("Account information received in the member management service for processing:{}", zeusMessagePayload.getPayload().getAccountNumber());
        ZeusMessagePayload<Acknowledgement> createAcknowledgment = createAcknowledgment(createPayloadTracker);
        processAccount(zeusMessagePayload, createPayloadTracker);
        log.info("Sending the ack for account {}", zeusMessagePayload.getPayload().getAccountNumber());
        log.info("Ack id is {}", createAcknowledgment.getPayload().getAckId());
        return createAcknowledgment;
    }

    private PayloadTracker createPayloadTracker(ZeusMessagePayload<AccountProcessingRequest> zeusMessagePayload) throws JsonProcessingException {
        return this.payloadTrackerHelper.createPayloadTracker(PayloadTracker.builder().payload_key(zeusMessagePayload.getPayload().getTransactionDto().getZtcn()).payload_key_type_code("TRANSACTION").payloadId(zeusMessagePayload.getPayloadId()).payloadDirectionTypeCode("INBOUND").sourceDestinations(StringUtils.join(zeusMessagePayload.getMessageMetadata().getMessageSource())).payload(this.objectMapper.writeValueAsString(zeusMessagePayload)).build());
    }

    private ZeusMessagePayload<Acknowledgement> createAcknowledgment(PayloadTracker payloadTracker) throws JsonProcessingException {
        String[] strArr = {ZeusServiceNames.TRANSACTION_MANAGER};
        String randomString = ZeusRandomStringGenerator.randomString(15);
        ZeusMessagePayload<Acknowledgement> build = ZeusMessagePayload.builder().messageMetadata(MessageMetadata.builder().messageDestination(strArr).messageSource(ZeusServiceNames.ACCOUNT_PROCESSOR_SERVICE).messageCreationTimestamp(LocalDateTime.now()).build()).payload(Acknowledgement.builder().ackId(randomString).requestPayloadId(payloadTracker.getPayloadId()).build()).build();
        this.payloadTrackerDetailHelper.createPayloadTrackerDetail(PayloadTrackerDetail.builder().payloadTracker(payloadTracker).responseTypeCode("ACKNOWLEDGEMENT").responsePayload(this.objectMapper.writeValueAsString(build)).responsePayloadId(randomString).payloadDirectionTypeCode("OUTBOUND").sourceDestinations(StringUtils.join((Object[]) strArr, ',')).build());
        return build;
    }

    private void processAccount(ZeusMessagePayload<AccountProcessingRequest> zeusMessagePayload, PayloadTracker payloadTracker) throws JsonProcessingException {
        log.info(zeusMessagePayload.getPayload().getAccountNumber());
        AccountProcessingRequest payload = zeusMessagePayload.getPayload();
        log.info("Inside process account method for the transaction:{}", payload.getAccountNumber());
        log.info("The payload tracker is:{}", payloadTracker.getPayloadId());
        this.transactionProcessor.processTransaction(payload, payloadTracker).subscribe(accountProcessingResponse -> {
            try {
                log.info("About to call the producer for the account:{}", accountProcessingResponse.getAccountNumber());
                this.accountProcessingResponseProducer.sendAccountProcessingResponse(accountProcessingResponse);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    public AccountProcessingListener(ObjectMapper objectMapper, PayloadTrackerHelper payloadTrackerHelper, PayloadTrackerDetailHelper payloadTrackerDetailHelper, TransactionProcessor transactionProcessor, AccountProcessingResponseProducer accountProcessingResponseProducer) {
        this.objectMapper = objectMapper;
        this.payloadTrackerHelper = payloadTrackerHelper;
        this.payloadTrackerDetailHelper = payloadTrackerDetailHelper;
        this.transactionProcessor = transactionProcessor;
        this.accountProcessingResponseProducer = accountProcessingResponseProducer;
    }
}
