package qunar.tc.qmq.consumer.pull;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.broker.BrokerClusterInfo;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.common.StatusSource;
import qunar.tc.qmq.common.SwitchWaiter;
import qunar.tc.qmq.concurrent.NamedThreadFactory;
import qunar.tc.qmq.config.PullSubjectsConfig;
import qunar.tc.qmq.consumer.pull.PullService;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.MetricsConstants;
import qunar.tc.qmq.metrics.QmqCounter;
import qunar.tc.qmq.utils.RetrySubjectUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:qunar/tc/qmq/consumer/pull/DefaultPullEntry.class */
public class DefaultPullEntry extends AbstractPullEntry implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(PullEntry.class);
    static final ScheduledExecutorService DELAY_SCHEDULER = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("qmq-delay-scheduler"));
    private static final long PAUSETIME_OF_CLEAN_LAST_MESSAGE = 200;
    private static final long PAUSETIME_OF_NOAVAILABLE_BROKER = 100;
    private static final long PAUSETIME_OF_NOMESSAGE = 500;
    private final PushConsumer pushConsumer;
    private final AtomicReference<Integer> pullBatchSize;
    private final AtomicReference<Integer> pullTimeout;
    private final AtomicReference<Integer> ackNosendLimit;
    private final AtomicBoolean isRunning;
    private final SwitchWaiter onlineSwitcher;
    private final QmqCounter pullRunCounter;
    private final QmqCounter pauseCounter;
    private final PullStrategy pullStrategy;
    private final String brokerGroupName;
    private BrokerGroupInfo brokerGroup;
    private volatile SettableFuture<Boolean> onlineFuture;
    private final Executor executor;
    private static final int PREPARE_PULL = 0;
    private static final int PULL_DONE = 1;
    private final AtomicInteger state;
    private volatile PullParam pullParam;
    private volatile PullService.PullResultFuture pullFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/DefaultPullEntry$RunnableSettableFuture.class */
    public static class RunnableSettableFuture extends AbstractFuture implements Runnable {
        private RunnableSettableFuture() {
        }

        @Override // java.lang.Runnable
        public void run() {
            super.set((Object) null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultPullEntry(String str, PushConsumer pushConsumer, PullService pullService, AckService ackService, BrokerService brokerService, PullStrategy pullStrategy, SwitchWaiter switchWaiter, Executor executor) {
        super(pushConsumer.subject(), pushConsumer.group(), pullService, ackService, brokerService);
        this.isRunning = new AtomicBoolean(true);
        this.state = new AtomicInteger(PREPARE_PULL);
        this.brokerGroupName = str;
        String subject = pushConsumer.subject();
        String group = pushConsumer.group();
        this.pushConsumer = pushConsumer;
        String realSubject = RetrySubjectUtils.getRealSubject(subject);
        this.pullBatchSize = PullSubjectsConfig.get().getPullBatchSize(realSubject);
        this.pullTimeout = PullSubjectsConfig.get().getPullTimeout(realSubject);
        this.ackNosendLimit = PullSubjectsConfig.get().getAckNosendLimit(realSubject);
        this.pullStrategy = pullStrategy;
        this.onlineSwitcher = switchWaiter;
        this.executor = executor;
        String[] strArr = {subject, group};
        this.pullRunCounter = Metrics.counter("qmq_pull_run_count", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
        this.pauseCounter = Metrics.counter("qmq_pull_pause_count", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public void startPull() {
        this.executor.execute(this);
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public void run() {
        Thread currentThread = Thread.currentThread();
        String name = currentThread.getName();
        currentThread.setName("qmq-pull-entry-" + getSubject() + "-" + getConsumerGroup() + "-" + getBrokerGroup());
        try {
            try {
                switch (this.state.get()) {
                    case PREPARE_PULL /* 0 */:
                        this.brokerGroup = getBrokerGroup();
                        if (!this.isRunning.get()) {
                            currentThread.setName(name);
                            return;
                        }
                        if (!await(waitOnline())) {
                            if (!await(preparePull())) {
                                if (!await(validate())) {
                                    if (!await(waitOnline())) {
                                        this.pullParam = buildPullParam(this.pushConsumer.consumeParam(), this.brokerGroup, this.ackService.getAckSendInfo(this.brokerGroup, getSubject(), getConsumerGroup()), this.pullBatchSize.get().intValue(), this.pullTimeout.get().intValue());
                                        this.pullFuture = this.pullService.pullAsync(this.pullParam);
                                        this.state.set(PULL_DONE);
                                        await(this.pullFuture);
                                        break;
                                    } else {
                                        currentThread.setName(name);
                                        return;
                                    }
                                } else {
                                    currentThread.setName(name);
                                    return;
                                }
                            } else {
                                currentThread.setName(name);
                                return;
                            }
                        } else {
                            currentThread.setName(name);
                            return;
                        }
                    case PULL_DONE /* 1 */:
                        PullParam pullParam = this.pullParam;
                        try {
                            try {
                                List<PulledMessage> handlePullResult = handlePullResult(pullParam, (PullResult) this.pullFuture.get(), this.pushConsumer);
                                markSuccess(this.brokerGroup);
                                this.pullStrategy.record(handlePullResult.size() > 0);
                                this.pushConsumer.push(handlePullResult);
                                this.state.set(PREPARE_PULL);
                                run();
                                break;
                            } catch (Throwable th) {
                                this.state.set(PREPARE_PULL);
                                run();
                                throw th;
                            }
                        } catch (ExecutionException e) {
                            markFailed(this.brokerGroup);
                            if (!(e.getCause() instanceof TimeoutException)) {
                                LOGGER.error("pull message exception. {}", pullParam, e);
                            }
                            this.state.set(PREPARE_PULL);
                            run();
                            break;
                        } catch (Exception e2) {
                            markFailed(this.brokerGroup);
                            LOGGER.error("pull message exception. {}", pullParam, e2);
                            this.state.set(PREPARE_PULL);
                            run();
                            break;
                        }
                }
                currentThread.setName(name);
            } catch (Throwable th2) {
                LOGGER.error("pull error subject {} consumerGroup {}", new Object[]{getSubject(), getConsumerGroup(), th2});
                currentThread.setName(name);
            }
        } catch (Throwable th3) {
            currentThread.setName(name);
            throw th3;
        }
    }

    private boolean await(ListenableFuture listenableFuture) {
        if (listenableFuture == null) {
            return false;
        }
        listenableFuture.addListener(this, this.executor);
        return true;
    }

    private ListenableFuture waitOnline() {
        synchronized (this.onlineSwitcher) {
            if (this.onlineSwitcher.isOnline()) {
                return null;
            }
            SettableFuture<Boolean> create = SettableFuture.create();
            this.onlineFuture = create;
            return create;
        }
    }

    private ListenableFuture preparePull() {
        this.pullRunCounter.inc();
        if (!this.pushConsumer.cleanLocalBuffer()) {
            return delay("wait consumer", PAUSETIME_OF_CLEAN_LAST_MESSAGE);
        }
        if (this.pullStrategy.needPull()) {
            return null;
        }
        return delay("wait consumer", PAUSETIME_OF_NOMESSAGE);
    }

    private ListenableFuture validate() {
        if (BrokerGroupInfo.isInvalid(this.brokerGroup)) {
            return delay("no available broker", PAUSETIME_OF_NOAVAILABLE_BROKER);
        }
        if (this.ackService.getAckSendInfo(this.brokerGroup, getSubject(), getConsumerGroup()).getToSendNum() > this.ackNosendLimit.get().intValue()) {
            return delay("wait ack", PAUSETIME_OF_NOAVAILABLE_BROKER);
        }
        return null;
    }

    private ListenableFuture delay(String str, long j) {
        String subject = getSubject();
        String consumerGroup = getConsumerGroup();
        this.pauseCounter.inc();
        LOGGER.debug("pull pause {} ms, {}. subject={}, consumerGroup={}", new Object[]{Long.valueOf(j), str, subject, consumerGroup});
        RunnableSettableFuture runnableSettableFuture = new RunnableSettableFuture();
        DELAY_SCHEDULER.schedule(runnableSettableFuture, j, TimeUnit.MILLISECONDS);
        return runnableSettableFuture;
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public String getSubject() {
        return this.pushConsumer.subject();
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public String getConsumerGroup() {
        return this.pushConsumer.group();
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public void online(StatusSource statusSource) {
        synchronized (this.onlineSwitcher) {
            SettableFuture<Boolean> settableFuture = this.onlineFuture;
            if (settableFuture == null) {
                return;
            }
            settableFuture.set(true);
        }
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public void offline(StatusSource statusSource) {
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public void destroy() {
        this.isRunning.set(false);
    }

    private BrokerGroupInfo getBrokerGroup() {
        return getBrokerCluster().getGroupByName(this.brokerGroupName);
    }

    private BrokerClusterInfo getBrokerCluster() {
        return this.brokerService.getClusterBySubject(ClientType.CONSUMER, this.pushConsumer.subject(), this.pushConsumer.group());
    }
}
