package qunar.tc.qmq.consumer.pull;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.common.MapKeyBuilder;
import qunar.tc.qmq.consumer.pull.exception.AckException;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.MetricsConstants;
import qunar.tc.qmq.metrics.QmqCounter;
import qunar.tc.qmq.netty.client.NettyClient;
import qunar.tc.qmq.netty.client.ResponseFuture;
import qunar.tc.qmq.netty.exception.ClientSendException;
import qunar.tc.qmq.protocol.Datagram;
import qunar.tc.qmq.protocol.consumer.AckRequest;
import qunar.tc.qmq.protocol.consumer.AckRequestPayloadHolder;
import qunar.tc.qmq.util.RemotingBuilder;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:qunar/tc/qmq/consumer/pull/AckService.class */
public class AckService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AckService.class);
    private static final QmqCounter GET_PULL_OFFSET_ERROR = Metrics.counter("qmq_pull_getpulloffset_error");
    private static final long ACK_REQUEST_TIMEOUT_MILLIS = 10000;
    private final BrokerService brokerService;
    private final SendMessageBack sendMessageBack;
    private final DelayMessageService delayMessageService;
    private String clientId;
    private final NettyClient client = NettyClient.getClient();
    private final ConcurrentMap<String, AckSendQueue> senderMap = new ConcurrentHashMap();
    private int destroyWaitInSeconds = 5;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/AckService$AckResponseCallback.class */
    public static final class AckResponseCallback implements ResponseFuture.Callback {
        private final AckRequest request;
        private final SendAckCallback sendAckCallback;
        private final BrokerService brokerService;

        AckResponseCallback(AckRequest ackRequest, SendAckCallback sendAckCallback, BrokerService brokerService) {
            this.request = ackRequest;
            this.sendAckCallback = sendAckCallback;
            this.brokerService = brokerService;
        }

        public void processResponse(ResponseFuture responseFuture) {
            AckService.monitorAckTime(this.request.getSubject(), this.request.getGroup(), responseFuture.getRequestCostTime());
            Datagram response = responseFuture.getResponse();
            if (!responseFuture.isSendOk() || response == null) {
                AckService.monitorAckError(this.request.getSubject(), this.request.getGroup(), -1);
                this.sendAckCallback.fail(new AckException("send fail"));
                this.brokerService.refresh(ClientType.CONSUMER, this.request.getSubject(), this.request.getGroup());
                return;
            }
            short code = response.getHeader().getCode();
            if (code == 0) {
                this.sendAckCallback.success();
                return;
            }
            AckService.monitorAckError(this.request.getSubject(), this.request.getGroup(), 100 + code);
            this.brokerService.refresh(ClientType.CONSUMER, this.request.getSubject(), this.request.getGroup());
            this.sendAckCallback.fail(new AckException("responseCode: " + ((int) code)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/AckService$SendAckCallback.class */
    public interface SendAckCallback {
        void success();

        void fail(Exception exc);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AckService(BrokerService brokerService) {
        this.brokerService = brokerService;
        this.sendMessageBack = new SendMessageBackImpl(brokerService);
        this.delayMessageService = new DelayMessageService(brokerService, this.sendMessageBack);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<PulledMessage> buildPulledMessages(PullParam pullParam, PullResult pullResult, AckHook ackHook, PulledMessageFilter pulledMessageFilter) {
        List<BaseMessage> messages = pullResult.getMessages();
        ArrayList arrayList = new ArrayList(messages.size());
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList(messages.size());
        AckSendQueue orCreateSendQueue = getOrCreateSendQueue(pullResult.getBrokerGroup(), pullParam.getSubject(), pullParam.getGroup(), pullParam.isBroadcast());
        long j = 0;
        AckEntry ackEntry = null;
        for (BaseMessage baseMessage : messages) {
            long offset = getOffset(baseMessage);
            if (offset < j) {
                monitorGetPullOffsetError(baseMessage);
            } else {
                j = offset;
                AckEntry ackEntry2 = new AckEntry(orCreateSendQueue, offset, this.delayMessageService);
                arrayList3.add(ackEntry2);
                if (ackEntry != null) {
                    ackEntry.setNext(ackEntry2);
                }
                ackEntry = ackEntry2;
                PulledMessage pulledMessage = new PulledMessage(baseMessage, ackEntry2, ackHook);
                if (pulledMessageFilter.filter(pulledMessage)) {
                    arrayList.add(pulledMessage);
                } else {
                    arrayList2.add(pulledMessage);
                }
                pulledMessage.setSubject(pullParam.getOriginSubject());
                pulledMessage.setProperty(BaseMessage.keys.qmq_consumerGroupName, pullParam.getGroup());
            }
        }
        orCreateSendQueue.append(arrayList3);
        ackIgnoreMessages(arrayList2);
        preAckOnDemand(arrayList, pullParam.isConsumeMostOnce());
        return arrayList;
    }

    private void preAckOnDemand(List<PulledMessage> list, boolean z) {
        for (PulledMessage pulledMessage : list) {
            if (z) {
                AckHelper.ackWithTrace(pulledMessage, null);
            }
        }
    }

    private void ackIgnoreMessages(List<PulledMessage> list) {
        Iterator<PulledMessage> it = list.iterator();
        while (it.hasNext()) {
            AckHelper.ackWithTrace(it.next(), null);
        }
    }

    private AckSendQueue getOrCreateSendQueue(BrokerGroupInfo brokerGroupInfo, String str, String str2, boolean z) {
        return this.senderMap.computeIfAbsent(MapKeyBuilder.buildSenderKey(brokerGroupInfo.getGroupName(), str, str2), str3 -> {
            AckSendQueue ackSendQueue = new AckSendQueue(brokerGroupInfo.getGroupName(), str, str2, this, this.brokerService, this.sendMessageBack, z);
            ackSendQueue.init();
            return ackSendQueue;
        });
    }

    private long getOffset(BaseMessage baseMessage) {
        Object property = baseMessage.getProperty(BaseMessage.keys.qmq_pullOffset);
        if (property == null) {
            return -1L;
        }
        try {
            return Long.parseLong(property.toString());
        } catch (Exception e) {
            return -1L;
        }
    }

    private static void monitorGetPullOffsetError(BaseMessage baseMessage) {
        LOGGER.error("lost pull offset. msgId=" + baseMessage.getMessageId());
        GET_PULL_OFFSET_ERROR.inc();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendAck(BrokerGroupInfo brokerGroupInfo, String str, String str2, AckSendEntry ackSendEntry, SendAckCallback sendAckCallback) {
        AckRequest buildAckRequest = buildAckRequest(str, str2, ackSendEntry);
        sendRequest(brokerGroupInfo, str, str2, buildAckRequest, RemotingBuilder.buildRequestDatagram((short) 12, new AckRequestPayloadHolder(buildAckRequest)), sendAckCallback);
    }

    private AckRequest buildAckRequest(String str, String str2, AckSendEntry ackSendEntry) {
        AckRequest ackRequest = new AckRequest();
        ackRequest.setSubject(str);
        ackRequest.setGroup(str2);
        ackRequest.setConsumerId(this.clientId);
        ackRequest.setPullOffsetBegin(ackSendEntry.getPullOffsetBegin());
        ackRequest.setPullOffsetLast(ackSendEntry.getPullOffsetLast());
        ackRequest.setBroadcast((byte) (ackSendEntry.isBroadcast() ? 1 : 0));
        return ackRequest;
    }

    private void sendRequest(BrokerGroupInfo brokerGroupInfo, String str, String str2, AckRequest ackRequest, Datagram datagram, SendAckCallback sendAckCallback) {
        try {
            this.client.sendAsync(brokerGroupInfo.getMaster(), datagram, ACK_REQUEST_TIMEOUT_MILLIS, new AckResponseCallback(ackRequest, sendAckCallback, this.brokerService));
        } catch (ClientSendException e) {
            monitorAckError(str, str2, e.getSendErrorCode().ordinal());
            sendAckCallback.fail(e);
        } catch (Exception e2) {
            monitorAckError(str, str2, -1);
            sendAckCallback.fail(e2);
        }
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public void setDestroyWaitInSeconds(int i) {
        this.destroyWaitInSeconds = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tryCleanAck() {
        Iterator<AckSendQueue> it = this.senderMap.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().trySendAck(1000L);
            } catch (Exception e) {
                LOGGER.error("try clean ack exception", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void destroy() {
        tryCleanAck();
        Iterator<AckSendQueue> it = this.senderMap.values().iterator();
        while (it.hasNext()) {
            it.next().destroy(this.destroyWaitInSeconds * 1000);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void monitorAckTime(String str, String str2, long j) {
        Metrics.timer("qmq_pull_ack_timer", MetricsConstants.SUBJECT_GROUP_ARRAY, new String[]{str, str2}).update(j, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void monitorAckError(String str, String str2, int i) {
        LOGGER.error("ack error. subject={}, group={}, errorCode={}", new Object[]{str, str2, Integer.valueOf(i)});
        Metrics.counter("qmq_pull_ack_error", MetricsConstants.SUBJECT_GROUP_ARRAY, new String[]{str, str2}).inc();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AckSendInfo getAckSendInfo(BrokerGroupInfo brokerGroupInfo, String str, String str2) {
        AckSendQueue ackSendQueue = this.senderMap.get(MapKeyBuilder.buildSenderKey(brokerGroupInfo.getGroupName(), str, str2));
        return ackSendQueue == null ? new AckSendInfo() : ackSendQueue.getAckSendInfo();
    }
}
