package qunar.tc.qmq.consumer.pull;

import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.broker.BrokerClusterInfo;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.broker.BrokerLoadBalance;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.broker.impl.PollBrokerLoadBalance;
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.common.TimerUtil;
import qunar.tc.qmq.consumer.pull.SendMessageBack;
import qunar.tc.qmq.consumer.pull.exception.SendMessageBackException;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.netty.client.NettyClient;
import qunar.tc.qmq.netty.client.ResponseFuture;
import qunar.tc.qmq.netty.exception.ClientSendException;
import qunar.tc.qmq.protocol.Datagram;
import qunar.tc.qmq.protocol.MessagesPayloadHolder;
import qunar.tc.qmq.protocol.QMQSerializer;
import qunar.tc.qmq.protocol.producer.SendResult;
import qunar.tc.qmq.util.RemotingBuilder;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:qunar/tc/qmq/consumer/pull/SendMessageBackImpl.class */
public class SendMessageBackImpl implements SendMessageBack {
    private static final long SEND_MESSAGE_TIMEOUT = 5000;
    private static final int SEND_BACK_DELAY_SECONDS = 5;
    private final BrokerService brokerService;
    private final BrokerLoadBalance brokerLoadBalance = PollBrokerLoadBalance.getInstance();
    private static final Logger LOGGER = LoggerFactory.getLogger(SendMessageBackImpl.class);
    private static final NettyClient NETTY_CLIENT = NettyClient.getClient();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SendMessageBackImpl(BrokerService brokerService) {
        this.brokerService = brokerService;
    }

    @Override // qunar.tc.qmq.consumer.pull.SendMessageBack
    public void sendBack(final BrokerGroupInfo brokerGroupInfo, BaseMessage baseMessage, final SendMessageBack.Callback callback, final ClientType clientType) {
        if (baseMessage == null) {
            if (callback != null) {
                callback.success();
                return;
            }
            return;
        }
        baseMessage.setProperty(BaseMessage.keys.qmq_createTime, new Date().getTime());
        Datagram buildRequestDatagram = RemotingBuilder.buildRequestDatagram((short) 10, new MessagesPayloadHolder(Lists.newArrayList(new BaseMessage[]{baseMessage})));
        final String subject = baseMessage.getSubject();
        try {
            NETTY_CLIENT.sendAsync(brokerGroupInfo.getMaster(), buildRequestDatagram, SEND_MESSAGE_TIMEOUT, new ResponseFuture.Callback() { // from class: qunar.tc.qmq.consumer.pull.SendMessageBackImpl.1
                public void processResponse(ResponseFuture responseFuture) {
                    Datagram response = responseFuture.getResponse();
                    if (!responseFuture.isSendOk() || response == null) {
                        SendMessageBackImpl.monitorSendError(subject, -1);
                        callback.fail(new SendMessageBackException("send fail"));
                    } else {
                        try {
                            handleResponse(response);
                        } finally {
                            response.release();
                        }
                    }
                }

                private void handleResponse(Datagram datagram) {
                    short code = datagram.getHeader().getCode();
                    SendResult sendResult = SendMessageBackImpl.this.getSendResult(datagram);
                    if (sendResult == null) {
                        SendMessageBackImpl.monitorSendError(subject, 100 + code);
                        callback.fail(new SendMessageBackException("responseCode=" + ((int) code)));
                    } else {
                        if (code == 0 && sendResult.getCode() == 0) {
                            callback.success();
                            return;
                        }
                        if (code == 52 || sendResult.getCode() == 4) {
                            brokerGroupInfo.setAvailable(false);
                            SendMessageBackImpl.this.brokerService.refresh(clientType, subject);
                        }
                        SendMessageBackImpl.monitorSendError(subject, 100 + code);
                        callback.fail(new SendMessageBackException("responseCode=" + ((int) code) + ", sendCode=" + sendResult.getCode()));
                    }
                }
            });
        } catch (Exception e) {
            LOGGER.error("send message error. subject={}", subject);
            monitorSendError(subject, -1);
            callback.fail(e);
        } catch (ClientSendException e2) {
            LOGGER.error("send message error. subject={}", subject);
            monitorSendError(subject, e2.getSendErrorCode().ordinal());
            callback.fail(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SendResult getSendResult(Datagram datagram) {
        try {
            Map deserializeSendResultMap = QMQSerializer.deserializeSendResultMap(datagram.getBody());
            return deserializeSendResultMap.isEmpty() ? SendResult.OK : (SendResult) deserializeSendResultMap.values().iterator().next();
        } catch (Exception e) {
            LOGGER.error("sendback exception on deserializeSendResultMap.", e);
            return null;
        }
    }

    @Override // qunar.tc.qmq.consumer.pull.SendMessageBack
    public void sendBackAndCompleteNack(int i, final BaseMessage baseMessage, final AckEntry ackEntry) {
        final BrokerClusterInfo clusterBySubject = this.brokerService.getClusterBySubject(ClientType.PRODUCER, baseMessage.getSubject());
        sendBack(this.brokerLoadBalance.loadBalance(clusterBySubject, (BrokerGroupInfo) null), baseMessage, new SendMessageBack.Callback() { // from class: qunar.tc.qmq.consumer.pull.SendMessageBackImpl.2
            private final int retryTooMuch;
            private final AtomicInteger retryNumOnFail = new AtomicInteger(0);

            {
                this.retryTooMuch = clusterBySubject.getGroups().size() * 2;
            }

            @Override // qunar.tc.qmq.consumer.pull.SendMessageBack.Callback
            public void success() {
                ackEntry.completed();
            }

            @Override // qunar.tc.qmq.consumer.pull.SendMessageBack.Callback
            public void fail(Throwable th) {
                if (this.retryNumOnFail.incrementAndGet() > this.retryTooMuch) {
                    if (th instanceof SendMessageBackException) {
                        SendMessageBackImpl.LOGGER.error("send message back fail, and retry {} times after {} seconds. exception: {}", new Object[]{Integer.valueOf(this.retryNumOnFail.get()), Integer.valueOf(SendMessageBackImpl.SEND_BACK_DELAY_SECONDS), th.getMessage()});
                    } else {
                        SendMessageBackImpl.LOGGER.error("send message back fail, and retry {} times after {} seconds", new Object[]{Integer.valueOf(this.retryNumOnFail.get()), Integer.valueOf(SendMessageBackImpl.SEND_BACK_DELAY_SECONDS), th});
                    }
                    TimerUtil.newTimeout(new TimerTask() { // from class: qunar.tc.qmq.consumer.pull.SendMessageBackImpl.2.1
                        public void run(Timeout timeout) {
                            SendMessageBackImpl.this.sendBackAndCompleteNack(baseMessage, this);
                        }
                    }, 5L, TimeUnit.SECONDS);
                    return;
                }
                if (th instanceof SendMessageBackException) {
                    SendMessageBackImpl.LOGGER.error("send message back fail, and retry {} times. exception: {}", new Object[]{Integer.valueOf(this.retryNumOnFail.get()), Integer.valueOf(SendMessageBackImpl.SEND_BACK_DELAY_SECONDS), th.getMessage()});
                } else {
                    SendMessageBackImpl.LOGGER.error("send message back fail, and retry {} times", new Object[]{Integer.valueOf(this.retryNumOnFail.get()), Integer.valueOf(SendMessageBackImpl.SEND_BACK_DELAY_SECONDS), th});
                }
                SendMessageBackImpl.this.sendBackAndCompleteNack(baseMessage, this);
            }
        }, ClientType.PRODUCER);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendBackAndCompleteNack(BaseMessage baseMessage, SendMessageBack.Callback callback) {
        sendBack(this.brokerLoadBalance.loadBalance(this.brokerService.getClusterBySubject(ClientType.PRODUCER, baseMessage.getSubject()), (BrokerGroupInfo) null), baseMessage, callback, ClientType.PRODUCER);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void monitorSendError(String str, int i) {
        Metrics.counter("qmq_pull_send_msg_error", new String[]{"subject", "error"}, new String[]{str, String.valueOf(i)}).inc();
    }
}
