package qunar.tc.qmq.consumer.pull;

import com.google.common.base.Strings;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.PullConsumer;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.broker.impl.BrokerServiceImpl;
import qunar.tc.qmq.common.ClientInfo;
import qunar.tc.qmq.common.EnvProvider;
import qunar.tc.qmq.common.MapKeyBuilder;
import qunar.tc.qmq.common.StatusSource;
import qunar.tc.qmq.concurrent.NamedThreadFactory;
import qunar.tc.qmq.consumer.exception.DuplicateListenerException;
import qunar.tc.qmq.consumer.register.ConsumerRegister;
import qunar.tc.qmq.consumer.register.RegistParam;
import qunar.tc.qmq.metainfoclient.ConsumerStateChangedListener;
import qunar.tc.qmq.metainfoclient.MetaInfoService;
import qunar.tc.qmq.protocol.consumer.SubEnvIsolationPullFilter;
import qunar.tc.qmq.utils.RetrySubjectUtils;

/* loaded from: input_file:qunar/tc/qmq/consumer/pull/PullRegister.class */
public class PullRegister implements ConsumerRegister, ConsumerStateChangedListener {
    private static final Logger LOG = LoggerFactory.getLogger(PullRegister.class);
    private volatile boolean isOnline = false;
    private final Map<String, PullEntry> pullEntryMap = new ConcurrentHashMap();
    private final Map<String, DefaultPullConsumer> pullConsumerMap = new ConcurrentHashMap();
    private final ExecutorService pullExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("qmq-pull"));
    private final ReentrantLock onlineLock = new ReentrantLock();
    private final MetaInfoService metaInfoService = new MetaInfoService();
    private final BrokerService brokerService = new BrokerServiceImpl(this.metaInfoService);
    private final PullService pullService = new PullService();
    private final AckService ackService = new AckService(this.brokerService);
    private String clientId;
    private String metaServer;
    private String appCode;
    private ClientInfo clientInfo;
    private int destroyWaitInSeconds;
    private EnvProvider envProvider;

    public void init() {
        this.metaInfoService.setMetaServer(this.metaServer);
        this.metaInfoService.setClientId(this.clientId);
        this.metaInfoService.init();
        this.ackService.setDestroyWaitInSeconds(this.destroyWaitInSeconds);
        this.ackService.setClientId(this.clientId);
        this.metaInfoService.setConsumerStateChangedListener(this);
        this.brokerService.setAppCode(this.appCode);
        this.brokerService.setClientInfo(this.clientInfo);
    }

    @Override // qunar.tc.qmq.consumer.register.ConsumerRegister
    public void regist(String str, String str2, RegistParam registParam) {
        synchronized (getConsumerLockKey(str, str2, "reg")) {
            if (this.envProvider != null) {
                String env = this.envProvider.env(str);
                if (!Strings.isNullOrEmpty(env)) {
                    String subEnv = this.envProvider.subEnv(env);
                    String subEnvIsolationGroup = toSubEnvIsolationGroup(str2, env, subEnv);
                    LOG.info("enable subenv isolation for {}/{}, rename consumer group to {}", new Object[]{str, str2, subEnvIsolationGroup});
                    str2 = subEnvIsolationGroup;
                    registParam.addFilter(new SubEnvIsolationPullFilter(env, subEnv));
                }
            }
            registPullEntry(str, str2, registParam, new AlwaysPullStrategy());
            if (RetrySubjectUtils.isDeadRetrySubject(str)) {
                return;
            }
            registPullEntry(RetrySubjectUtils.buildRetrySubject(str, str2), str2, registParam, new AlwaysPullStrategy());
        }
    }

    private String toSubEnvIsolationGroup(String str, String str2, String str3) {
        return str + "_" + str2 + "_" + str3;
    }

    private void registPullEntry(String str, String str2, RegistParam registParam, PullStrategy pullStrategy) {
        String buildSubscribeKey = MapKeyBuilder.buildSubscribeKey(str, str2);
        PullEntry pullEntry = this.pullEntryMap.get(buildSubscribeKey);
        if (pullEntry == PullEntry.EMPTY_PULL_ENTRY) {
            throw new DuplicateListenerException(buildSubscribeKey);
        }
        if (pullEntry == null) {
            pullEntry = createAndSubmitPullEntry(str, str2, registParam, pullStrategy);
        }
        if (this.isOnline) {
            pullEntry.online(registParam.getActionSrc());
        } else {
            pullEntry.offline(registParam.getActionSrc());
        }
    }

    private PullEntry createAndSubmitPullEntry(String str, String str2, RegistParam registParam, PullStrategy pullStrategy) {
        ParallelPullEntry parallelPullEntry = new ParallelPullEntry(new PushConsumerParam(str, str2, registParam), this.pullService, this.ackService, this.brokerService, pullStrategy, this.pullExecutor);
        this.pullEntryMap.put(MapKeyBuilder.buildSubscribeKey(str, str2), parallelPullEntry);
        parallelPullEntry.startPull();
        return parallelPullEntry;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PullConsumer createDefaultPullConsumer(String str, String str2, boolean z) {
        DefaultPullConsumer defaultPullConsumer = new DefaultPullConsumer(str, str2, z, this.clientId, this.pullService, this.ackService, this.brokerService);
        registerDefaultPullConsumer(defaultPullConsumer);
        return defaultPullConsumer;
    }

    private void registerDefaultPullConsumer(DefaultPullConsumer defaultPullConsumer) {
        String buildSubscribeKey = MapKeyBuilder.buildSubscribeKey(defaultPullConsumer.subject(), defaultPullConsumer.group());
        if (this.pullEntryMap.containsKey(buildSubscribeKey)) {
            throw new DuplicateListenerException(buildSubscribeKey);
        }
        this.pullEntryMap.put(buildSubscribeKey, PullEntry.EMPTY_PULL_ENTRY);
        this.pullConsumerMap.put(buildSubscribeKey, defaultPullConsumer);
        this.pullExecutor.submit(defaultPullConsumer);
    }

    @Override // qunar.tc.qmq.consumer.register.ConsumerRegister
    public void unregist(String str, String str2) {
        changeOnOffline(str, str2, false, StatusSource.CODE);
    }

    public void online(String str, String str2) {
        changeOnOffline(str, str2, true, StatusSource.OPS);
    }

    public void offline(String str, String str2) {
        changeOnOffline(str, str2, false, StatusSource.OPS);
    }

    private String getConsumerLockKey(String str, String str2, String str3) {
        return (str + ":" + str2 + ":" + str3).intern();
    }

    private void changeOnOffline(String str, String str2, boolean z, StatusSource statusSource) {
        synchronized (getConsumerLockKey(str, str2, "onOffline")) {
            String realSubject = RetrySubjectUtils.getRealSubject(str);
            String buildRetrySubject = RetrySubjectUtils.buildRetrySubject(realSubject, str2);
            String buildSubscribeKey = MapKeyBuilder.buildSubscribeKey(realSubject, str2);
            changeOnOffline(this.pullEntryMap.get(buildSubscribeKey), z, statusSource);
            changeOnOffline(this.pullEntryMap.get(MapKeyBuilder.buildSubscribeKey(buildRetrySubject, str2)), z, statusSource);
            DefaultPullConsumer defaultPullConsumer = this.pullConsumerMap.get(buildSubscribeKey);
            if (defaultPullConsumer == null) {
                return;
            }
            if (z) {
                defaultPullConsumer.online(statusSource);
            } else {
                defaultPullConsumer.offline(statusSource);
            }
        }
    }

    private void changeOnOffline(PullEntry pullEntry, boolean z, StatusSource statusSource) {
        if (pullEntry == null) {
            return;
        }
        synchronized (pullEntry) {
            if (z) {
                pullEntry.online(statusSource);
            } else {
                pullEntry.offline(statusSource);
            }
        }
    }

    @Override // qunar.tc.qmq.consumer.register.ConsumerRegister
    public void setAutoOnline(boolean z) {
        if (z) {
            online();
        } else {
            offline();
        }
        this.isOnline = z;
    }

    public boolean offline() {
        this.onlineLock.lock();
        try {
            this.isOnline = false;
            Iterator<PullEntry> it = this.pullEntryMap.values().iterator();
            while (it.hasNext()) {
                it.next().offline(StatusSource.HEALTHCHECKER);
            }
            Iterator<DefaultPullConsumer> it2 = this.pullConsumerMap.values().iterator();
            while (it2.hasNext()) {
                it2.next().offline(StatusSource.HEALTHCHECKER);
            }
            this.ackService.tryCleanAck();
            return true;
        } finally {
            this.onlineLock.unlock();
        }
    }

    public boolean online() {
        this.onlineLock.lock();
        try {
            this.isOnline = true;
            Iterator<PullEntry> it = this.pullEntryMap.values().iterator();
            while (it.hasNext()) {
                it.next().online(StatusSource.HEALTHCHECKER);
            }
            Iterator<DefaultPullConsumer> it2 = this.pullConsumerMap.values().iterator();
            while (it2.hasNext()) {
                it2.next().online(StatusSource.HEALTHCHECKER);
            }
            return true;
        } finally {
            this.onlineLock.unlock();
        }
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public void setMetaServer(String str) {
        this.metaServer = str;
    }

    public void setEnvProvider(EnvProvider envProvider) {
        this.envProvider = envProvider;
    }

    public void setAppCode(String str) {
        this.appCode = str;
    }

    public void setClientInfo(ClientInfo clientInfo) {
        this.clientInfo = clientInfo;
    }

    @Override // qunar.tc.qmq.consumer.register.ConsumerRegister
    public void destroy() {
        Iterator<PullEntry> it = this.pullEntryMap.values().iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        this.ackService.destroy();
    }

    public void setDestroyWaitInSeconds(int i) {
        this.destroyWaitInSeconds = i;
    }
}
