package qunar.tc.qmq.consumer.pull;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractFuture;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.config.PullSubjectsConfig;
import qunar.tc.qmq.consumer.pull.exception.PullException;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.MetricsConstants;
import qunar.tc.qmq.netty.client.NettyClient;
import qunar.tc.qmq.netty.client.ResponseFuture;
import qunar.tc.qmq.protocol.Datagram;
import qunar.tc.qmq.protocol.consumer.PullRequest;
import qunar.tc.qmq.protocol.consumer.PullRequestPayloadHolder;
import qunar.tc.qmq.util.RemotingBuilder;
import qunar.tc.qmq.utils.Flags;
import qunar.tc.qmq.utils.PayloadHolderUtils;
import qunar.tc.qmq.utils.RetrySubjectUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:qunar/tc/qmq/consumer/pull/PullService.class */
public class PullService {
    private static final Logger LOGGER = LoggerFactory.getLogger(PullService.class);
    private final NettyClient client = NettyClient.getClient();

    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/PullService$PullCallback.class */
    public interface PullCallback {
        void onCompleted(short s, List<BaseMessage> list);

        void onException(Exception exc);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/PullService$PullCallbackWrapper.class */
    public static final class PullCallbackWrapper implements ResponseFuture.Callback {
        private final PullRequest request;
        private final PullCallback callback;

        PullCallbackWrapper(PullRequest pullRequest, PullCallback pullCallback) {
            this.request = pullRequest;
            this.callback = pullCallback;
        }

        public void processResponse(ResponseFuture responseFuture) {
            try {
                doProcessResponse(responseFuture);
            } catch (Exception e) {
                PullService.monitorPullError(this.request.getSubject(), this.request.getGroup());
                this.callback.onException(e);
            }
        }

        private void doProcessResponse(ResponseFuture responseFuture) {
            PullService.monitorPullTime(this.request.getSubject(), this.request.getGroup(), responseFuture.getRequestCostTime());
            if (!responseFuture.isSendOk()) {
                PullService.monitorPullError(this.request.getSubject(), this.request.getGroup());
                this.callback.onException(new PullException("send fail. opaque=" + responseFuture.getOpaque()));
                return;
            }
            Datagram response = responseFuture.getResponse();
            if (response != null) {
                try {
                    handleResponse(response);
                } finally {
                    response.release();
                }
            } else {
                PullService.monitorPullError(this.request.getSubject(), this.request.getGroup());
                if (responseFuture.isTimeout()) {
                    this.callback.onException(new TimeoutException("pull message " + this.request.getSubject() + " timeout response: opaque=" + responseFuture.getOpaque() + ". " + responseFuture));
                } else {
                    this.callback.onException(new PullException("pull message " + this.request.getSubject() + "no receive response: opaque=" + responseFuture.getOpaque() + ". " + responseFuture));
                }
            }
        }

        private void handleResponse(Datagram datagram) {
            short code = datagram.getHeader().getCode();
            if (code == 4) {
                this.callback.onCompleted(code, Collections.emptyList());
                return;
            }
            if (code != 0) {
                PullService.monitorPullError(this.request.getSubject(), this.request.getGroup());
                this.callback.onCompleted(code, Collections.emptyList());
                return;
            }
            List<BaseMessage> deserializeBaseMessage = deserializeBaseMessage(datagram.getBody());
            if (deserializeBaseMessage == null) {
                deserializeBaseMessage = Collections.emptyList();
            }
            PullService.monitorPullCount(this.request.getSubject(), this.request.getGroup(), deserializeBaseMessage.size());
            for (BaseMessage baseMessage : deserializeBaseMessage) {
                if (baseMessage.getMaxRetryNum() < 0) {
                    baseMessage.setMaxRetryNum(PullSubjectsConfig.get().getMaxRetryNum(RetrySubjectUtils.getRealSubject(baseMessage.getSubject())).get().intValue());
                }
            }
            this.callback.onCompleted(code, deserializeBaseMessage);
        }

        private List<BaseMessage> deserializeBaseMessage(ByteBuf byteBuf) {
            if (byteBuf.readableBytes() == 0) {
                return Collections.emptyList();
            }
            ArrayList newArrayList = Lists.newArrayList();
            long readLong = byteBuf.readLong();
            byteBuf.readLong();
            while (byteBuf.isReadable()) {
                BaseMessage baseMessage = new BaseMessage();
                byte readByte = byteBuf.readByte();
                byteBuf.skipBytes(16);
                String readString = PayloadHolderUtils.readString(byteBuf);
                String readString2 = PayloadHolderUtils.readString(byteBuf);
                readTags(byteBuf, baseMessage, readByte);
                HashMap<String, Object> deserializeMapWrapper = deserializeMapWrapper(readString, readString2, byteBuf.readSlice(byteBuf.readInt()));
                baseMessage.setMessageId(readString2);
                baseMessage.setSubject(readString);
                baseMessage.setAttrs(deserializeMapWrapper);
                baseMessage.setProperty(BaseMessage.keys.qmq_pullOffset, readLong);
                newArrayList.add(baseMessage);
                readLong++;
            }
            return newArrayList;
        }

        private void readTags(ByteBuf byteBuf, BaseMessage baseMessage, byte b) {
            if (Flags.hasTags(b)) {
                int readByte = byteBuf.readByte();
                for (int i = 0; i < readByte; i++) {
                    baseMessage.addTag(PayloadHolderUtils.readString(byteBuf));
                }
            }
        }

        private HashMap<String, Object> deserializeMapWrapper(String str, String str2, ByteBuf byteBuf) {
            try {
                return deserializeMap(byteBuf);
            } catch (Exception e) {
                PullService.LOGGER.error("deserialize message failed subject:{} messageId: {}", str, str2);
                Metrics.counter("qmq_pull_deserialize_fail_count", MetricsConstants.SUBJECT_ARRAY, new String[]{str}).inc();
                HashMap<String, Object> hashMap = new HashMap<>();
                hashMap.put(BaseMessage.keys.qmq_corruptData.name(), "true");
                hashMap.put(BaseMessage.keys.qmq_createTime.name(), Long.valueOf(new Date().getTime()));
                return hashMap;
            }
        }

        private HashMap<String, Object> deserializeMap(ByteBuf byteBuf) {
            HashMap<String, Object> hashMap = new HashMap<>();
            while (byteBuf.isReadable(4)) {
                hashMap.put(PayloadHolderUtils.readString(byteBuf), PayloadHolderUtils.readString(byteBuf));
            }
            return hashMap;
        }
    }

    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/PullService$PullResultFuture.class */
    public static final class PullResultFuture extends AbstractFuture<PullResult> implements PullCallback {
        private final BrokerGroupInfo brokerGroup;

        PullResultFuture(BrokerGroupInfo brokerGroupInfo) {
            this.brokerGroup = brokerGroupInfo;
        }

        @Override // qunar.tc.qmq.consumer.pull.PullService.PullCallback
        public void onCompleted(short s, List<BaseMessage> list) {
            super.set(new PullResult(s, list, this.brokerGroup));
        }

        @Override // qunar.tc.qmq.consumer.pull.PullService.PullCallback
        public void onException(Exception exc) {
            super.setException(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PullResult pull(PullParam pullParam) throws ExecutionException, InterruptedException {
        PullResultFuture pullResultFuture = new PullResultFuture(pullParam.getBrokerGroup());
        pull(pullParam, pullResultFuture);
        return (PullResult) pullResultFuture.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PullResultFuture pullAsync(PullParam pullParam) {
        PullResultFuture pullResultFuture = new PullResultFuture(pullParam.getBrokerGroup());
        pull(pullParam, pullResultFuture);
        return pullResultFuture;
    }

    private void pull(PullParam pullParam, PullCallback pullCallback) {
        PullRequest buildPullRequest = buildPullRequest(pullParam);
        Datagram buildRequestDatagram = RemotingBuilder.buildRequestDatagram((short) 11, new PullRequestPayloadHolder(buildPullRequest));
        long requestTimeoutMillis = pullParam.getRequestTimeoutMillis();
        long timeoutMillis = pullParam.getTimeoutMillis();
        try {
            this.client.sendAsync(pullParam.getBrokerGroup().getMaster(), buildRequestDatagram, timeoutMillis < 0 ? requestTimeoutMillis : requestTimeoutMillis + timeoutMillis, new PullCallbackWrapper(buildPullRequest, pullCallback));
        } catch (Exception e) {
            monitorPullError(pullParam.getSubject(), pullParam.getGroup());
            pullCallback.onException(e);
        }
    }

    private PullRequest buildPullRequest(PullParam pullParam) {
        PullRequest pullRequest = new PullRequest();
        pullRequest.setSubject(pullParam.getSubject());
        pullRequest.setGroup(pullParam.getGroup());
        pullRequest.setRequestNum(pullParam.getPullBatchSize());
        pullRequest.setTimeoutMillis(pullParam.getTimeoutMillis());
        pullRequest.setOffset(pullParam.getConsumeOffset());
        pullRequest.setPullOffsetBegin(pullParam.getMinPullOffset());
        pullRequest.setPullOffsetLast(pullParam.getMaxPullOffset());
        pullRequest.setConsumerId(pullParam.getConsumerId());
        pullRequest.setBroadcast(pullParam.isBroadcast());
        pullRequest.setFilters(pullParam.getFilters());
        return pullRequest;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void monitorPullTime(String str, String str2, long j) {
        Metrics.timer("qmq_pull_timer", MetricsConstants.SUBJECT_GROUP_ARRAY, new String[]{str, str2}).update(j, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void monitorPullError(String str, String str2) {
        Metrics.counter("qmq_pull_error", MetricsConstants.SUBJECT_GROUP_ARRAY, new String[]{str, str2}).inc();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void monitorPullCount(String str, String str2, int i) {
        Metrics.counter("qmq_pull_count", MetricsConstants.SUBJECT_GROUP_ARRAY, new String[]{str, str2}).inc(i);
    }
}
