package qunar.tc.qmq.consumer.pull;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.broker.BrokerClusterInfo;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.common.StatusSource;
import qunar.tc.qmq.common.SwitchWaiter;

/* loaded from: input_file:qunar/tc/qmq/consumer/pull/ParallelPullEntry.class */
public class ParallelPullEntry implements PullEntry, Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelPullEntry.class);
    private final Map<String, PullEntry> pullEntryMap = Maps.newConcurrentMap();
    private final SwitchWaiter onlineSwitcher = new SwitchWaiter(false);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final PushConsumerParam pushConsumerParam;
    private final PullService pullService;
    private final AckService ackService;
    private final BrokerService brokerService;
    private final PullStrategy pullStrategy;
    private final Executor executor;
    private volatile SettableFuture<Boolean> onlineFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/ParallelPullEntry$RunnableSettableFuture.class */
    public static class RunnableSettableFuture extends AbstractFuture implements Runnable {
        private RunnableSettableFuture() {
        }

        @Override // java.lang.Runnable
        public void run() {
            super.set((Object) null);
        }
    }

    public ParallelPullEntry(PushConsumerParam pushConsumerParam, PullService pullService, AckService ackService, BrokerService brokerService, PullStrategy pullStrategy, Executor executor) {
        this.pushConsumerParam = pushConsumerParam;
        this.pullService = pullService;
        this.ackService = ackService;
        this.brokerService = brokerService;
        this.pullStrategy = pullStrategy;
        this.executor = executor;
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public String getSubject() {
        return this.pushConsumerParam.getSubject();
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public String getConsumerGroup() {
        return this.pushConsumerParam.getGroup();
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public void online(StatusSource statusSource) {
        synchronized (this.onlineSwitcher) {
            this.onlineSwitcher.on(statusSource);
            SettableFuture<Boolean> settableFuture = this.onlineFuture;
            if (settableFuture == null) {
                return;
            }
            settableFuture.set(true);
            LOGGER.info("pull consumer online. subject={}, group={}", getSubject(), getConsumerGroup());
            this.pullEntryMap.values().forEach(pullEntry -> {
                pullEntry.online(statusSource);
            });
        }
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public void offline(StatusSource statusSource) {
        synchronized (this.onlineSwitcher) {
            this.onlineSwitcher.off(statusSource);
        }
        LOGGER.info("pull consumer offline. subject={}, group={}", getSubject(), getConsumerGroup());
        this.pullEntryMap.values().forEach(pullEntry -> {
            pullEntry.offline(statusSource);
        });
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public void startPull() {
        if (this.isStarted.compareAndSet(false, true)) {
            this.executor.execute(this);
        }
    }

    @Override // qunar.tc.qmq.consumer.pull.PullEntry
    public void destroy() {
        this.isStarted.set(false);
        this.pullEntryMap.values().forEach((v0) -> {
            v0.destroy();
        });
    }

    private boolean await(ListenableFuture listenableFuture) {
        if (listenableFuture == null) {
            return false;
        }
        listenableFuture.addListener(this, this.executor);
        return true;
    }

    private ListenableFuture waitOnline() {
        synchronized (this.onlineSwitcher) {
            if (this.onlineSwitcher.isOnline()) {
                return null;
            }
            SettableFuture<Boolean> create = SettableFuture.create();
            this.onlineFuture = create;
            return create;
        }
    }

    private ListenableFuture delay(long j) {
        RunnableSettableFuture runnableSettableFuture = new RunnableSettableFuture();
        DefaultPullEntry.DELAY_SCHEDULER.schedule(runnableSettableFuture, j, TimeUnit.MILLISECONDS);
        return runnableSettableFuture;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.isStarted.get() && !await(waitOnline())) {
            Thread currentThread = Thread.currentThread();
            String name = currentThread.getName();
            currentThread.setName("qmq-pull-entry-" + getSubject() + "-" + getConsumerGroup());
            try {
                BrokerClusterInfo clusterBySubject = this.brokerService.getClusterBySubject(ClientType.CONSUMER, getSubject(), getConsumerGroup());
                List groups = clusterBySubject.getGroups();
                if (groups == null || groups.isEmpty()) {
                    return;
                }
                Iterator it = groups.iterator();
                while (it.hasNext()) {
                    String groupName = ((BrokerGroupInfo) it.next()).getGroupName();
                    this.pullEntryMap.computeIfAbsent(groupName, str -> {
                        DefaultPullEntry defaultPullEntry = new DefaultPullEntry(groupName, new PushConsumerImpl(this.pushConsumerParam.getSubject(), this.pushConsumerParam.getGroup(), this.pushConsumerParam.getRegistParam()), this.pullService, this.ackService, this.brokerService, this.pullStrategy, this.onlineSwitcher, this.executor);
                        defaultPullEntry.startPull();
                        return defaultPullEntry;
                    });
                }
                for (Map.Entry entry : new HashMap(this.pullEntryMap).entrySet()) {
                    if (clusterBySubject.getGroupByName((String) entry.getKey()) == null) {
                        ((PullEntry) entry.getValue()).destroy();
                        this.pullEntryMap.remove(entry.getKey());
                    }
                }
                currentThread.setName(name);
                await(delay(1000L));
            } finally {
                currentThread.setName(name);
                await(delay(1000L));
            }
        }
    }
}
