package qunar.tc.qmq.producer.sender;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import qunar.tc.qmq.ProduceMessage;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.broker.BrokerLoadBalance;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.broker.impl.PollBrokerLoadBalance;
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.MetricsConstants;
import qunar.tc.qmq.metrics.QmqCounter;
import qunar.tc.qmq.metrics.QmqTimer;
import qunar.tc.qmq.netty.exception.BrokerRejectException;
import qunar.tc.qmq.netty.exception.ClientSendException;
import qunar.tc.qmq.netty.exception.RemoteException;
import qunar.tc.qmq.netty.exception.RemoteResponseUnreadableException;
import qunar.tc.qmq.netty.exception.RemoteTimeoutException;
import qunar.tc.qmq.netty.exception.SubjectNotAssignedException;
import qunar.tc.qmq.protocol.Datagram;
import qunar.tc.qmq.protocol.MessagesPayloadHolder;
import qunar.tc.qmq.protocol.QMQSerializer;
import qunar.tc.qmq.protocol.producer.SendResult;
import qunar.tc.qmq.service.exceptions.BlockMessageException;
import qunar.tc.qmq.service.exceptions.DuplicateMessageException;
import qunar.tc.qmq.service.exceptions.MessageException;
import qunar.tc.qmq.tracing.TraceUtil;
import qunar.tc.qmq.util.RemotingBuilder;

/* loaded from: input_file:qunar/tc/qmq/producer/sender/NettyConnection.class */
class NettyConnection implements Connection {
    private final String subject;
    private final ClientType clientType;
    private final NettyProducerClient producerClient;
    private final BrokerService brokerService;
    private volatile BrokerGroupInfo lastSentBroker;
    private final QmqCounter sendMessageCountMetrics;
    private final BrokerLoadBalance brokerLoadBalance = PollBrokerLoadBalance.getInstance();
    private final QmqTimer sendMessageTimerMetrics = Metrics.timer("qmq_client_send_msg_timer");

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyConnection(String str, ClientType clientType, NettyProducerClient nettyProducerClient, BrokerService brokerService) {
        this.subject = str;
        this.clientType = clientType;
        this.producerClient = nettyProducerClient;
        this.brokerService = brokerService;
        this.sendMessageCountMetrics = Metrics.counter("qmq_client_send_msg_count", MetricsConstants.SUBJECT_ARRAY, new String[]{str});
    }

    public void init() {
        this.brokerService.refresh(this.clientType, this.subject);
    }

    @Override // qunar.tc.qmq.producer.sender.Connection
    public Map<String, MessageException> send(List<ProduceMessage> list) throws ClientSendException, RemoteException, BrokerRejectException {
        this.sendMessageCountMetrics.inc(list.size());
        long currentTimeMillis = System.currentTimeMillis();
        try {
            BrokerGroupInfo loadBalance = this.brokerLoadBalance.loadBalance(this.brokerService.getClusterBySubject(this.clientType, this.subject), this.lastSentBroker);
            if (loadBalance == null) {
                throw new ClientSendException(ClientSendException.SendErrorCode.CREATE_CHANNEL_FAIL);
            }
            this.lastSentBroker = loadBalance;
            Datagram doSend = doSend(loadBalance, list);
            switch (doSend.getHeader().getCode()) {
                case 0:
                    Map<String, MessageException> process = process(loadBalance, doSend);
                    this.sendMessageTimerMetrics.update(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
                    return process;
                case 52:
                    handleSendReject(loadBalance);
                    throw new BrokerRejectException("");
                default:
                    throw new RemoteException();
            }
        } catch (Throwable th) {
            this.sendMessageTimerMetrics.update(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    private void handleSendReject(BrokerGroupInfo brokerGroupInfo) {
        if (brokerGroupInfo != null) {
            brokerGroupInfo.setAvailable(false);
        }
        this.brokerService.refresh(ClientType.PRODUCER, this.subject);
    }

    private Map<String, MessageException> process(BrokerGroupInfo brokerGroupInfo, Datagram datagram) throws RemoteResponseUnreadableException {
        ByteBuf body = datagram.getBody();
        if (body != null) {
            try {
                if (body.isReadable()) {
                    Map deserializeSendResultMap = QMQSerializer.deserializeSendResultMap(body);
                    boolean z = false;
                    HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(deserializeSendResultMap.size());
                    for (Map.Entry entry : deserializeSendResultMap.entrySet()) {
                        String str = (String) entry.getKey();
                        SendResult sendResult = (SendResult) entry.getValue();
                        switch (sendResult.getCode()) {
                            case 0:
                                break;
                            case 1:
                                newHashMapWithExpectedSize.put(str, new MessageException(str, "broker busy"));
                                break;
                            case 2:
                                newHashMapWithExpectedSize.put(str, new DuplicateMessageException(str));
                                break;
                            case 3:
                                newHashMapWithExpectedSize.put(str, new SubjectNotAssignedException(str));
                                break;
                            case 4:
                                z = true;
                                newHashMapWithExpectedSize.put(str, new BrokerRejectException(str));
                                break;
                            case 5:
                                newHashMapWithExpectedSize.put(str, new BlockMessageException(str));
                                break;
                            default:
                                newHashMapWithExpectedSize.put(str, new MessageException(str, sendResult.getRemark()));
                                break;
                        }
                    }
                    if (z) {
                        handleSendReject(brokerGroupInfo);
                    }
                    return newHashMapWithExpectedSize;
                }
            } finally {
                datagram.release();
            }
        }
        Map<String, MessageException> emptyMap = Collections.emptyMap();
        datagram.release();
        return emptyMap;
    }

    private Datagram doSend(BrokerGroupInfo brokerGroupInfo, List<ProduceMessage> list) throws ClientSendException, RemoteTimeoutException {
        try {
            Datagram buildDatagram = buildDatagram(list);
            TraceUtil.setTag("broker", brokerGroupInfo.getGroupName());
            Datagram sendMessage = this.producerClient.sendMessage(brokerGroupInfo, buildDatagram);
            brokerGroupInfo.markSuccess();
            return sendMessage;
        } catch (ClientSendException | RemoteTimeoutException e) {
            brokerGroupInfo.markFailed();
            Metrics.counter("qmq_client_send_msg_error").inc(list.size());
            throw e;
        } catch (Exception e2) {
            brokerGroupInfo.markFailed();
            Metrics.counter("qmq_client_send_msg_error").inc(list.size());
            throw new RuntimeException(e2);
        }
    }

    private Datagram buildDatagram(List<ProduceMessage> list) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(list.size());
        Iterator<ProduceMessage> it = list.iterator();
        while (it.hasNext()) {
            newArrayListWithCapacity.add(it.next().getBase());
        }
        return RemotingBuilder.buildRequestDatagram((short) 10, new MessagesPayloadHolder(newArrayListWithCapacity));
    }

    @Override // qunar.tc.qmq.producer.sender.Connection
    public String url() {
        return "newqmq://" + this.subject;
    }

    @Override // qunar.tc.qmq.producer.sender.Connection
    public void destroy() {
    }
}
