package qunar.tc.qmq.consumer.pull;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.config.PullSubjectsConfig;
import qunar.tc.qmq.consumer.pull.PlainPullEntry;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.MetricsConstants;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:qunar/tc/qmq/consumer/pull/DefaultPullConsumer.class */
public class DefaultPullConsumer extends AbstractPullConsumer implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPullConsumer.class);
    private static final int POLL_TIMEOUT_MILLIS = 1000;
    private static final int MAX_FETCH_SIZE = 10000;
    private static final int MAX_TIMEOUT = 300000;
    private volatile boolean isStop;
    private final LinkedBlockingQueue<PullMessageFuture> requestQueue;
    private final LinkedBlockingQueue<Message> localBuffer;
    private final int preFetchSize;
    private final int lowWaterMark;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultPullConsumer(String str, String str2, boolean z, String str3, PullService pullService, AckService ackService, BrokerService brokerService) {
        super(str, str2, z, str3, pullService, ackService, brokerService);
        this.isStop = false;
        this.requestQueue = new LinkedBlockingQueue<>();
        this.localBuffer = new LinkedBlockingQueue<>();
        this.preFetchSize = PullSubjectsConfig.get().getPullBatchSize(str).get().intValue();
        this.lowWaterMark = Math.round(this.preFetchSize * 0.2f);
    }

    @Override // qunar.tc.qmq.consumer.pull.AbstractPullConsumer
    PullMessageFuture newFuture(int i, long j, boolean z) {
        if (i > MAX_FETCH_SIZE) {
            i = MAX_FETCH_SIZE;
        }
        if (j > 300000) {
            j = 300000;
        }
        if (i <= 0) {
            PullMessageFuture pullMessageFuture = new PullMessageFuture(i, i, j, z);
            pullMessageFuture.set(Collections.emptyList());
            return pullMessageFuture;
        }
        if (this.localBuffer.size() == 0) {
            PullMessageFuture pullMessageFuture2 = new PullMessageFuture(i, Math.max(i, this.preFetchSize), j, z);
            this.requestQueue.offer(pullMessageFuture2);
            return pullMessageFuture2;
        }
        if (this.localBuffer.size() >= i) {
            ArrayList arrayList = new ArrayList(i);
            this.localBuffer.drainTo(arrayList, i);
            if (arrayList.size() > 0) {
                checkLowWaterMark();
                PullMessageFuture pullMessageFuture3 = new PullMessageFuture(i, i, j, z);
                pullMessageFuture3.set((List<Message>) arrayList);
                return pullMessageFuture3;
            }
        }
        PullMessageFuture pullMessageFuture4 = new PullMessageFuture(i, Math.max(this.preFetchSize, i - this.localBuffer.size()), j, z);
        this.requestQueue.offer(pullMessageFuture4);
        return pullMessageFuture4;
    }

    private void checkLowWaterMark() {
        int size = this.localBuffer.size();
        if (size < this.lowWaterMark) {
            this.requestQueue.offer(new PullMessageFuture(0, this.preFetchSize - size, -1L, false));
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.isStop) {
            try {
                if (this.onlineSwitcher.waitOn()) {
                    PullMessageFuture poll = this.requestQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        doPull(poll);
                    }
                }
            } catch (InterruptedException e) {
                LOGGER.error("pullConsumer poll be interrupted. subject={}, group={}", new Object[]{subject(), group(), e});
            } catch (Exception e2) {
                LOGGER.error("pullConsumer poll exception. subject={}, group={}", new Object[]{subject(), group(), e2});
            }
        }
    }

    private void doPull(PullMessageFuture pullMessageFuture) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(pullMessageFuture.getFetchSize());
        try {
            try {
                this.retryPullEntry.pull(pullMessageFuture.getFetchSize(), -1, newArrayListWithCapacity);
                if (newArrayListWithCapacity.size() > 0 && pullMessageFuture.isPullOnce()) {
                    setResult(pullMessageFuture, newArrayListWithCapacity);
                    return;
                }
                if (pullMessageFuture.isResetCreateTime()) {
                    pullMessageFuture.resetCreateTime();
                }
                do {
                    int fetchSize = pullMessageFuture.getFetchSize() - newArrayListWithCapacity.size();
                    if (fetchSize <= 0 || this.pullEntry.pull(fetchSize, pullMessageFuture.getTimeout(), newArrayListWithCapacity) == PlainPullEntry.PlainPullResult.NO_BROKER || newArrayListWithCapacity.size() >= pullMessageFuture.getFetchSize()) {
                        break;
                    }
                } while (!pullMessageFuture.isExpired());
                setResult(pullMessageFuture, newArrayListWithCapacity);
            } catch (Exception e) {
                LOGGER.error("DefaultPullConsumer doPull exception. subject={}, group={}", new Object[]{subject(), group(), e});
                Metrics.counter("qmq_pull_defaultPull_doPull_fail", MetricsConstants.SUBJECT_GROUP_ARRAY, new String[]{subject(), group()}).inc();
                setResult(pullMessageFuture, newArrayListWithCapacity);
            }
        } catch (Throwable th) {
            setResult(pullMessageFuture, newArrayListWithCapacity);
            throw th;
        }
    }

    private void setResult(PullMessageFuture pullMessageFuture, List<Message> list) {
        int expectedSize = pullMessageFuture.getExpectedSize();
        if (expectedSize <= 0) {
            this.localBuffer.addAll(list);
            pullMessageFuture.set(Collections.emptyList());
            return;
        }
        ArrayList arrayList = new ArrayList(expectedSize);
        int size = this.localBuffer.size();
        if (size > 0) {
            this.localBuffer.drainTo(arrayList, Math.min(expectedSize, size));
        }
        int size2 = expectedSize - arrayList.size();
        if (size2 <= 0) {
            this.localBuffer.addAll(list);
            pullMessageFuture.set((List<Message>) arrayList);
        } else {
            arrayList.addAll(head(list, size2));
            this.localBuffer.addAll(tail(list, size2));
            pullMessageFuture.set((List<Message>) arrayList);
        }
    }

    private List<Message> head(List<Message> list, int i) {
        return i >= list.size() ? list : list.subList(0, i);
    }

    private List<Message> tail(List<Message> list, int i) {
        return i >= list.size() ? Collections.emptyList() : list.subList(i, list.size());
    }

    public void close() {
        this.isStop = true;
    }
}
