package qunar.tc.qmq.consumer.pull;

import com.google.common.base.Supplier;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.common.TimerUtil;
import qunar.tc.qmq.config.PullSubjectsConfig;
import qunar.tc.qmq.consumer.pull.AckService;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.MetricsConstants;
import qunar.tc.qmq.metrics.QmqCounter;
import qunar.tc.qmq.metrics.QmqMeter;
import qunar.tc.qmq.utils.RetrySubjectUtils;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:qunar/tc/qmq/consumer/pull/AckSendQueue.class */
public class AckSendQueue implements TimerTask {
    private static final long DEFAULT_PULL_OFFSET = -1;
    private static final int ACK_INTERVAL_SECONDS = 10;
    private static final long ACK_TRY_SEND_TIMEOUT_MILLIS = 1000;
    private static final int DESTROY_CHECK_WAIT_MILLIS = 50;
    private final String brokerGroupName;
    private final String subject;
    private final String group;
    private final AckService ackService;
    private final String retrySubject;
    private final String deadRetrySubject;
    private final AtomicReference<Integer> pullBatchSize;
    private QmqMeter sendNumQps;
    private QmqCounter appendErrorCount;
    private QmqCounter sendErrorCount;
    private QmqCounter sendFailCount;
    private QmqCounter deadQueueCount;
    private final BrokerService brokerService;
    private final SendMessageBack sendMessageBack;
    private final boolean isBroadcast;
    private static final Logger LOGGER = LoggerFactory.getLogger(AckSendQueue.class);
    private static final AckSendEntry EMPTY_ACK = new AckSendEntry();
    private static final AckService.SendAckCallback EMPTY_ACK_CALLBACK = new AckService.SendAckCallback() { // from class: qunar.tc.qmq.consumer.pull.AckSendQueue.5
        AnonymousClass5() {
        }

        @Override // qunar.tc.qmq.consumer.pull.AckService.SendAckCallback
        public void success() {
            AckSendQueue.LOGGER.debug("send heartbeat ok");
        }

        @Override // qunar.tc.qmq.consumer.pull.AckService.SendAckCallback
        public void fail(Exception exc) {
            AckSendQueue.LOGGER.error("send heartbeat fail", exc);
        }
    };
    private final ReentrantLock updateLock = new ReentrantLock();
    private final AtomicLong minPullOffset = new AtomicLong(DEFAULT_PULL_OFFSET);
    private final AtomicLong maxPullOffset = new AtomicLong(DEFAULT_PULL_OFFSET);
    private final AtomicInteger toSendNum = new AtomicInteger(0);
    private final LinkedBlockingQueue<AckSendEntry> sendEntryQueue = new LinkedBlockingQueue<>();
    private final ReentrantLock sendLock = new ReentrantLock();
    private final AtomicBoolean inSending = new AtomicBoolean(false);
    private final RateLimiter ackSendFailLogLimit = RateLimiter.create(0.5d);
    private volatile AckEntry head = null;
    private volatile AckEntry tail = null;
    private volatile AckEntry beginScanPosition = null;
    private volatile long lastAppendOffset = DEFAULT_PULL_OFFSET;
    private volatile long lastSendOkOffset = DEFAULT_PULL_OFFSET;

    /* renamed from: qunar.tc.qmq.consumer.pull.AckSendQueue$1 */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/AckSendQueue$1.class */
    public class AnonymousClass1 implements AckService.SendAckCallback {
        final /* synthetic */ AckSendEntry val$sendEntry;

        AnonymousClass1(AckSendEntry ackSendEntry) {
            r5 = ackSendEntry;
        }

        @Override // qunar.tc.qmq.consumer.pull.AckService.SendAckCallback
        public void success() {
            if (AckSendQueue.this.lastSendOkOffset != AckSendQueue.DEFAULT_PULL_OFFSET && AckSendQueue.this.lastSendOkOffset + 1 != r5.getPullOffsetBegin()) {
                AckSendQueue.LOGGER.warn("{}/{} ack send not continous. last={}, send={}", new Object[]{AckSendQueue.this.subject, AckSendQueue.this.group, Long.valueOf(AckSendQueue.this.lastSendOkOffset), r5});
                AckSendQueue.this.sendErrorCount.inc();
            }
            AckSendQueue.access$002(AckSendQueue.this, r5.getPullOffsetLast());
            AckSendQueue.this.minPullOffset.set(r5.getPullOffsetLast() + 1);
            int pullOffsetLast = ((int) (r5.getPullOffsetLast() - r5.getPullOffsetBegin())) + 1;
            AckSendQueue.this.toSendNum.getAndAdd(-pullOffsetLast);
            AckSendQueue.this.sendNumQps.mark(pullOffsetLast);
            AckSendEntry ackSendEntry = (AckSendEntry) AckSendQueue.this.sendEntryQueue.peek();
            if (ackSendEntry == null || ackSendEntry.getPullOffsetBegin() != r5.getPullOffsetBegin()) {
                AckSendQueue.LOGGER.error("ack send error: {}, {}", r5, ackSendEntry);
                AckSendQueue.this.sendErrorCount.inc();
            } else {
                AckSendQueue.LOGGER.debug("AckSendRet ok [{}, {}]", Long.valueOf(r5.getPullOffsetBegin()), Long.valueOf(r5.getPullOffsetLast()));
                AckSendQueue.this.sendEntryQueue.poll();
            }
            AckSendQueue.this.inSending.set(false);
            AckSendQueue.this.sendAck();
        }

        @Override // qunar.tc.qmq.consumer.pull.AckService.SendAckCallback
        public void fail(Exception exc) {
            if (AckSendQueue.this.ackSendFailLogLimit.tryAcquire()) {
                AckSendQueue.LOGGER.warn("send ack fail, will retry next", exc);
            }
            AckSendQueue.LOGGER.debug("AckSendRet fail [{}, {}]", Long.valueOf(r5.getPullOffsetBegin()), Long.valueOf(r5.getPullOffsetLast()));
            AckSendQueue.this.sendFailCount.inc();
            AckSendQueue.this.inSending.set(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: qunar.tc.qmq.consumer.pull.AckSendQueue$2 */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/AckSendQueue$2.class */
    public class AnonymousClass2 implements Supplier<Double> {
        AnonymousClass2() {
        }

        /* renamed from: get */
        public Double m17get() {
            return Double.valueOf(AckSendQueue.this.minPullOffset.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: qunar.tc.qmq.consumer.pull.AckSendQueue$3 */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/AckSendQueue$3.class */
    public class AnonymousClass3 implements Supplier<Double> {
        AnonymousClass3() {
        }

        /* renamed from: get */
        public Double m18get() {
            return Double.valueOf(AckSendQueue.this.maxPullOffset.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: qunar.tc.qmq.consumer.pull.AckSendQueue$4 */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/AckSendQueue$4.class */
    public class AnonymousClass4 implements Supplier<Double> {
        AnonymousClass4() {
        }

        /* renamed from: get */
        public Double m19get() {
            return Double.valueOf(AckSendQueue.this.toSendNum.get());
        }
    }

    /* renamed from: qunar.tc.qmq.consumer.pull.AckSendQueue$5 */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/AckSendQueue$5.class */
    static class AnonymousClass5 implements AckService.SendAckCallback {
        AnonymousClass5() {
        }

        @Override // qunar.tc.qmq.consumer.pull.AckService.SendAckCallback
        public void success() {
            AckSendQueue.LOGGER.debug("send heartbeat ok");
        }

        @Override // qunar.tc.qmq.consumer.pull.AckService.SendAckCallback
        public void fail(Exception exc) {
            AckSendQueue.LOGGER.error("send heartbeat fail", exc);
        }
    }

    public AckSendQueue(String str, String str2, String str3, AckService ackService, BrokerService brokerService, SendMessageBack sendMessageBack, boolean z) {
        this.brokerGroupName = str;
        this.subject = str2;
        this.group = str3;
        this.ackService = ackService;
        this.brokerService = brokerService;
        this.sendMessageBack = sendMessageBack;
        this.isBroadcast = z;
        String realSubject = RetrySubjectUtils.getRealSubject(str2);
        this.retrySubject = RetrySubjectUtils.buildRetrySubject(realSubject, str3);
        this.deadRetrySubject = RetrySubjectUtils.buildDeadRetrySubject(realSubject, str3);
        this.pullBatchSize = PullSubjectsConfig.get().getPullBatchSize(realSubject);
    }

    public void append(List<AckEntry> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        this.updateLock.lock();
        try {
            if (this.lastAppendOffset != DEFAULT_PULL_OFFSET && this.lastAppendOffset + 1 != list.get(0).pullOffset()) {
                LOGGER.warn("{}/{} append ack entry not continous. last: {}, new: {}", new Object[]{this.subject, this.group, Long.valueOf(this.lastAppendOffset), Long.valueOf(list.get(0).pullOffset())});
                this.appendErrorCount.inc();
            }
            if (this.head == null) {
                AckEntry ackEntry = list.get(0);
                this.head = ackEntry;
                this.beginScanPosition = ackEntry;
                this.minPullOffset.set(this.head.pullOffset());
            }
            if (this.tail != null) {
                this.tail.setNext(list.get(0));
            }
            this.tail = list.get(list.size() - 1);
            this.lastAppendOffset = this.tail.pullOffset();
            this.maxPullOffset.set(this.tail.pullOffset());
            this.toSendNum.getAndAdd(list.size());
        } finally {
            this.updateLock.unlock();
        }
    }

    public void sendBackAndCompleteNack(int i, BaseMessage baseMessage, AckEntry ackEntry) {
        String str = i > baseMessage.getMaxRetryNum() ? this.deadRetrySubject : this.retrySubject;
        if (this.deadRetrySubject.equals(str)) {
            this.deadQueueCount.inc();
            LOGGER.warn("process message retry num {} >= {}, and dead retry. subject={}, group={}, msgId={}", new Object[]{Integer.valueOf(i - 1), Integer.valueOf(baseMessage.getMaxRetryNum()), this.subject, this.group, baseMessage.getMessageId()});
        }
        baseMessage.setSubject(str);
        this.sendMessageBack.sendBackAndCompleteNack(i, baseMessage, ackEntry);
    }

    public void ackCompleted(AckEntry ackEntry) {
        if (ackEntry == null) {
            return;
        }
        this.updateLock.lock();
        try {
            if (this.beginScanPosition == null || this.beginScanPosition.pullOffset() != ackEntry.pullOffset()) {
                return;
            }
            AckEntry scanCompleted = scanCompleted(ackEntry);
            this.beginScanPosition = scanCompleted.next();
            if (!allowSendAck(scanCompleted)) {
                this.updateLock.unlock();
                return;
            }
            AckSendEntry ackSendEntry = new AckSendEntry(this.head, scanCompleted, this.isBroadcast);
            this.head = this.beginScanPosition;
            if (this.head == null) {
                this.tail = null;
            }
            this.sendEntryQueue.offer(ackSendEntry);
            this.updateLock.unlock();
            sendAck();
        } finally {
            this.updateLock.unlock();
        }
    }

    private boolean allowSendAck(AckEntry ackEntry) {
        return ackEntry.next() == null || ackEntry.pullOffset() - this.head.pullOffset() >= ((long) (this.pullBatchSize.get().intValue() - 1));
    }

    private AckEntry scanCompleted(AckEntry ackEntry) {
        AckEntry ackEntry2;
        AckEntry ackEntry3 = ackEntry;
        while (true) {
            ackEntry2 = ackEntry3;
            if (ackEntry2.next() == null || !ackEntry2.next().isDone()) {
                break;
            }
            ackEntry3 = ackEntry2.next();
        }
        return ackEntry2;
    }

    public boolean trySendAck(long j) {
        if (!tryLock(j)) {
            return false;
        }
        try {
            if (this.head == null || !this.head.isDone()) {
                boolean sendAck = sendAck();
                this.updateLock.unlock();
                return sendAck;
            }
            AckEntry scanCompleted = scanCompleted(this.head);
            AckSendEntry ackSendEntry = new AckSendEntry(this.head, scanCompleted, this.isBroadcast);
            AckEntry next = scanCompleted.next();
            this.beginScanPosition = next;
            this.head = next;
            if (this.head == null) {
                this.tail = null;
            }
            this.sendEntryQueue.offer(ackSendEntry);
            this.updateLock.unlock();
            return sendAck();
        } catch (Throwable th) {
            this.updateLock.unlock();
            throw th;
        }
    }

    private boolean tryLock(long j) {
        try {
            return this.updateLock.tryLock(j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return false;
        }
    }

    public boolean sendAck() {
        if (this.inSending.get()) {
            return false;
        }
        this.sendLock.lock();
        try {
            if (this.inSending.get() || this.sendEntryQueue.isEmpty()) {
                return false;
            }
            AckSendEntry peek = this.sendEntryQueue.peek();
            if (peek != null) {
                this.inSending.set(true);
                doSendAck(peek);
                return true;
            }
            this.sendEntryQueue.poll();
            LOGGER.error("sendEntry is null");
            return false;
        } finally {
            this.sendLock.unlock();
        }
    }

    private void doSendAck(AckSendEntry ackSendEntry) {
        BrokerGroupInfo brokerGroup = getBrokerGroup();
        if (brokerGroup != null) {
            this.ackService.sendAck(brokerGroup, this.subject, this.group, ackSendEntry, new AckService.SendAckCallback() { // from class: qunar.tc.qmq.consumer.pull.AckSendQueue.1
                final /* synthetic */ AckSendEntry val$sendEntry;

                AnonymousClass1(AckSendEntry ackSendEntry2) {
                    r5 = ackSendEntry2;
                }

                @Override // qunar.tc.qmq.consumer.pull.AckService.SendAckCallback
                public void success() {
                    if (AckSendQueue.this.lastSendOkOffset != AckSendQueue.DEFAULT_PULL_OFFSET && AckSendQueue.this.lastSendOkOffset + 1 != r5.getPullOffsetBegin()) {
                        AckSendQueue.LOGGER.warn("{}/{} ack send not continous. last={}, send={}", new Object[]{AckSendQueue.this.subject, AckSendQueue.this.group, Long.valueOf(AckSendQueue.this.lastSendOkOffset), r5});
                        AckSendQueue.this.sendErrorCount.inc();
                    }
                    AckSendQueue.access$002(AckSendQueue.this, r5.getPullOffsetLast());
                    AckSendQueue.this.minPullOffset.set(r5.getPullOffsetLast() + 1);
                    int pullOffsetLast = ((int) (r5.getPullOffsetLast() - r5.getPullOffsetBegin())) + 1;
                    AckSendQueue.this.toSendNum.getAndAdd(-pullOffsetLast);
                    AckSendQueue.this.sendNumQps.mark(pullOffsetLast);
                    AckSendEntry ackSendEntry2 = (AckSendEntry) AckSendQueue.this.sendEntryQueue.peek();
                    if (ackSendEntry2 == null || ackSendEntry2.getPullOffsetBegin() != r5.getPullOffsetBegin()) {
                        AckSendQueue.LOGGER.error("ack send error: {}, {}", r5, ackSendEntry2);
                        AckSendQueue.this.sendErrorCount.inc();
                    } else {
                        AckSendQueue.LOGGER.debug("AckSendRet ok [{}, {}]", Long.valueOf(r5.getPullOffsetBegin()), Long.valueOf(r5.getPullOffsetLast()));
                        AckSendQueue.this.sendEntryQueue.poll();
                    }
                    AckSendQueue.this.inSending.set(false);
                    AckSendQueue.this.sendAck();
                }

                @Override // qunar.tc.qmq.consumer.pull.AckService.SendAckCallback
                public void fail(Exception exc) {
                    if (AckSendQueue.this.ackSendFailLogLimit.tryAcquire()) {
                        AckSendQueue.LOGGER.warn("send ack fail, will retry next", exc);
                    }
                    AckSendQueue.LOGGER.debug("AckSendRet fail [{}, {}]", Long.valueOf(r5.getPullOffsetBegin()), Long.valueOf(r5.getPullOffsetLast()));
                    AckSendQueue.this.sendFailCount.inc();
                    AckSendQueue.this.inSending.set(false);
                }
            });
        } else {
            LOGGER.debug("lost broker group: {}. subject={}, consumeGroup={}", new Object[]{this.brokerGroupName, this.subject, this.group});
            this.inSending.set(false);
        }
    }

    private BrokerGroupInfo getBrokerGroup() {
        return this.brokerService.getClusterBySubject(ClientType.CONSUMER, this.subject, this.group).getGroupByName(this.brokerGroupName);
    }

    public AckSendInfo getAckSendInfo() {
        AckSendInfo ackSendInfo = new AckSendInfo();
        ackSendInfo.setMinPullOffset(this.minPullOffset.get());
        ackSendInfo.setMaxPullOffset(this.maxPullOffset.get());
        ackSendInfo.setToSendNum(this.toSendNum.get());
        return ackSendInfo;
    }

    public void init() {
        TimerUtil.newTimeout(this, 10L, TimeUnit.SECONDS);
        String[] strArr = {this.subject, this.group};
        this.sendNumQps = Metrics.meter("qmq_pull_ack_sendnum_qps", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
        this.appendErrorCount = Metrics.counter("qmq_pull_ack_appenderror_count", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
        this.sendErrorCount = Metrics.counter("qmq_pull_ack_senderror_count", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
        this.sendFailCount = Metrics.counter("qmq_pull_ack_sendfail_count", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
        this.deadQueueCount = Metrics.counter("qmq_deadqueue_send_count", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
        Metrics.gauge("qmq_pull_ack_min_offset", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr, new Supplier<Double>() { // from class: qunar.tc.qmq.consumer.pull.AckSendQueue.2
            AnonymousClass2() {
            }

            /* renamed from: get */
            public Double m17get() {
                return Double.valueOf(AckSendQueue.this.minPullOffset.get());
            }
        });
        Metrics.gauge("qmq_pull_ack_max_offset", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr, new Supplier<Double>() { // from class: qunar.tc.qmq.consumer.pull.AckSendQueue.3
            AnonymousClass3() {
            }

            /* renamed from: get */
            public Double m18get() {
                return Double.valueOf(AckSendQueue.this.maxPullOffset.get());
            }
        });
        Metrics.gauge("qmq_pull_ack_tosendnum", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr, new Supplier<Double>() { // from class: qunar.tc.qmq.consumer.pull.AckSendQueue.4
            AnonymousClass4() {
            }

            /* renamed from: get */
            public Double m19get() {
                return Double.valueOf(AckSendQueue.this.toSendNum.get());
            }
        });
    }

    public String getSubject() {
        return this.subject;
    }

    public String getGroup() {
        return this.group;
    }

    public void run(Timeout timeout) {
        try {
            if (!trySendAck(ACK_TRY_SEND_TIMEOUT_MILLIS)) {
                BrokerGroupInfo brokerGroup = getBrokerGroup();
                if (brokerGroup == null) {
                    LOGGER.debug("lost broker group: {}. subject={}, consumeGroup={}", new Object[]{this.brokerGroupName, this.subject, this.group});
                    return;
                }
                this.ackService.sendAck(brokerGroup, this.subject, this.group, EMPTY_ACK, EMPTY_ACK_CALLBACK);
            }
        } finally {
            TimerUtil.newTimeout(this, 10L, TimeUnit.SECONDS);
        }
    }

    public void destroy(long j) {
        while (j > 0 && this.toSendNum.get() > 0) {
            try {
                Thread.sleep(50L);
                j -= 50;
            } catch (Exception e) {
                return;
            }
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: qunar.tc.qmq.consumer.pull.AckSendQueue.access$002(qunar.tc.qmq.consumer.pull.AckSendQueue, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$002(qunar.tc.qmq.consumer.pull.AckSendQueue r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastSendOkOffset = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: qunar.tc.qmq.consumer.pull.AckSendQueue.access$002(qunar.tc.qmq.consumer.pull.AckSendQueue, long):long");
    }

    static {
    }
}
