package org.redisson;

import ch.qos.logback.core.pattern.color.ANSIConstants;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import org.redisson.api.RReliableTopic;
import org.redisson.api.StreamMessageId;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;
import uk.co.idv.method.entities.otp.policy.delivery.phone.AcceptableSimSwapStatuses;

/* loaded from: input_file:BOOT-INF/lib/redisson-3.16.6.jar:org/redisson/RedissonReliableTopic.class */
public class RedissonReliableTopic extends RedissonExpirable implements RReliableTopic {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RedissonReliableTopic.class);
    private final Map<String, Entry> listeners;
    private final AtomicReference<String> subscriberId;
    private volatile RFuture<Map<StreamMessageId, Map<String, Object>>> readFuture;
    private volatile Timeout timeoutTask;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/redisson-3.16.6.jar:org/redisson/RedissonReliableTopic$Entry.class */
    public static class Entry {
        private final Class<?> type;
        private final MessageListener<?> listener;

        Entry(Class<?> cls, MessageListener<?> messageListener) {
            this.type = cls;
            this.listener = messageListener;
        }

        public Class<?> getType() {
            return this.type;
        }

        public MessageListener<?> getListener() {
            return this.listener;
        }
    }

    public RedissonReliableTopic(Codec codec, CommandAsyncExecutor commandAsyncExecutor, String str) {
        super(codec, commandAsyncExecutor, str);
        this.listeners = new ConcurrentHashMap();
        this.subscriberId = new AtomicReference<>();
    }

    public RedissonReliableTopic(CommandAsyncExecutor commandAsyncExecutor, String str) {
        super(commandAsyncExecutor, str);
        this.listeners = new ConcurrentHashMap();
        this.subscriberId = new AtomicReference<>();
    }

    private String getSubscribersName() {
        return suffixName(getRawName(), "subscribers");
    }

    private String getMapName() {
        return suffixName(getRawName(), BeanDefinitionParserDelegate.MAP_ELEMENT);
    }

    private String getCounter() {
        return suffixName(getRawName(), "counter");
    }

    private String getTimeout() {
        return suffixName(getRawName(), AcceptableSimSwapStatuses.TIMEOUT);
    }

    @Override // org.redisson.api.RReliableTopic
    public long publish(Object obj) {
        return ((Long) get(publishAsync(obj))).longValue();
    }

    @Override // org.redisson.api.RReliableTopic
    public <M> String addListener(Class<M> cls, MessageListener<M> messageListener) {
        return (String) get(addListenerAsync(cls, messageListener));
    }

    @Override // org.redisson.api.RReliableTopic
    public void removeListener(String... strArr) {
        get(removeListenerAsync(strArr));
    }

    @Override // org.redisson.api.RReliableTopic
    public void removeAllListeners() {
        get(removeAllListenersAsync());
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Void> removeAllListenersAsync() {
        this.listeners.clear();
        return removeSubscriber();
    }

    @Override // org.redisson.api.RReliableTopic
    public long size() {
        return ((Long) get(sizeAsync())).longValue();
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Long> sizeAsync() {
        return this.commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.XLEN, getRawName());
    }

    @Override // org.redisson.api.RReliableTopic
    public int countListeners() {
        return this.listeners.size();
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Long> publishAsync(Object obj) {
        return this.commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_LONG, "redis.call('xadd', KEYS[1], '*', 'm', ARGV[1]); return redis.call('zcard', KEYS[2]); ", Arrays.asList(getRawName(), getSubscribersName()), encode(obj));
    }

    protected String generateId() {
        byte[] bArr = new byte[16];
        ThreadLocalRandom.current().nextBytes(bArr);
        return ByteBufUtil.hexDump(bArr);
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public <M> RFuture<String> addListenerAsync(Class<M> cls, MessageListener<M> messageListener) {
        String generateId = generateId();
        this.listeners.put(generateId, new Entry(cls, messageListener));
        if (this.subscriberId.get() == null && this.subscriberId.compareAndSet(null, generateId)) {
            renewExpiration();
            StreamMessageId streamMessageId = new StreamMessageId(System.currentTimeMillis(), 0L);
            RedissonPromise redissonPromise = new RedissonPromise();
            this.commandExecutor.evalWriteNoRetryAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('incr', KEYS[3]); redis.call('zadd', KEYS[4], ARGV[3], ARGV[2]); redis.call('zadd', KEYS[1], value, ARGV[2]); redis.call('hset', KEYS[2], ARGV[2], ARGV[1]); ", Arrays.asList(getSubscribersName(), getMapName(), getCounter(), getTimeout()), streamMessageId, generateId, Long.valueOf(System.currentTimeMillis() + this.commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout())).onComplete((r8, th) -> {
                if (th != null) {
                    redissonPromise.tryFailure(th);
                } else {
                    poll(generateId, streamMessageId);
                    redissonPromise.trySuccess(generateId);
                }
            });
            return redissonPromise;
        }
        return RedissonPromise.newSucceededFuture(generateId);
    }

    private void poll(String str, StreamMessageId streamMessageId) {
        this.readFuture = this.commandExecutor.readAsync(getRawName(), new CompositeCodec(StringCodec.INSTANCE, this.codec), RedisCommands.XREAD_BLOCKING_SINGLE, "BLOCK", 0, "STREAMS", getRawName(), streamMessageId);
        this.readFuture.onComplete((map, th) -> {
            if (this.readFuture.isCancelled()) {
                return;
            }
            if (th != null) {
                if (th instanceof RedissonShutdownException) {
                    return;
                }
                poll(str, streamMessageId);
            } else {
                this.commandExecutor.getConnectionManager().getExecutor().execute(() -> {
                    map.values().forEach(map -> {
                        Object obj = map.get(ANSIConstants.ESC_END);
                        this.listeners.values().forEach(entry -> {
                            if (entry.getType().isInstance(obj)) {
                                entry.getListener().onMessage(getRawName(), obj);
                            }
                        });
                    });
                });
                if (this.listeners.isEmpty()) {
                    return;
                }
                StreamMessageId streamMessageId2 = (StreamMessageId) map.keySet().stream().skip(map.size() - 1).findFirst().get();
                this.commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local r = redis.call('zscore', KEYS[2], ARGV[2]); if r ~= false then local value = redis.call('incr', KEYS[4]); redis.call('zadd', KEYS[2], value, ARGV[2]); redis.call('hset', KEYS[3], ARGV[2], ARGV[1]); end; local t = redis.call('zrange', KEYS[5], 0, 0, 'WITHSCORES'); if tonumber(t[2]) < tonumber(ARGV[3]) then redis.call('hdel', KEYS[3], t[1]); redis.call('zrem', KEYS[2], t[1]); redis.call('zrem', KEYS[5], t[1]); end; local v = redis.call('zrange', KEYS[2], 0, 0); local score = redis.call('hget', KEYS[3], v[1]); local range = redis.call('xrange', KEYS[1], score, '+'); if #range == 0 then redis.call('del', KEYS[1]); elseif #range == 1 and range[1][1] == score then redis.call('del', KEYS[1]); else redis.call('xtrim', KEYS[1], 'maxlen', #range); end;return r ~= false; ", Arrays.asList(getRawName(), getSubscribersName(), getMapName(), getCounter(), getTimeout()), streamMessageId2, str, Long.valueOf(System.currentTimeMillis())).onComplete((bool, th) -> {
                    if (th != null) {
                        if (th instanceof RedissonShutdownException) {
                            return;
                        }
                        log.error("Unable to update subscriber status", th);
                    } else {
                        if (!bool.booleanValue() || this.listeners.isEmpty()) {
                            return;
                        }
                        poll(str, streamMessageId2);
                    }
                });
            }
        });
    }

    @Override // org.redisson.RedissonObject, org.redisson.api.RObjectAsync
    public RFuture<Boolean> deleteAsync() {
        return deleteAsync(getRawName(), getSubscribersName(), getMapName(), getCounter(), getTimeout());
    }

    @Override // org.redisson.RedissonObject, org.redisson.api.RObjectAsync
    public RFuture<Long> sizeInMemoryAsync() {
        return super.sizeInMemoryAsync(Arrays.asList(getRawName(), getSubscribersName(), getMapName(), getCounter(), getTimeout()));
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public RFuture<Boolean> expireAsync(long j, TimeUnit timeUnit) {
        return expireAsync(j, timeUnit, getRawName(), getSubscribersName(), getMapName(), getCounter(), getTimeout());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.redisson.RedissonExpirable
    public RFuture<Boolean> expireAtAsync(long j, String... strArr) {
        return super.expireAtAsync(j, getRawName(), getSubscribersName(), getMapName(), getCounter(), getTimeout());
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public RFuture<Boolean> clearExpireAsync() {
        return clearExpireAsync(getRawName(), getSubscribersName(), getMapName(), getCounter(), getTimeout());
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Void> removeListenerAsync(String... strArr) {
        this.listeners.keySet().removeAll(Arrays.asList(strArr));
        return this.listeners.isEmpty() ? removeSubscriber() : RedissonPromise.newSucceededFuture(null);
    }

    private RFuture<Void> removeSubscriber() {
        this.readFuture.cancel(false);
        this.timeoutTask.cancel();
        return this.commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "redis.call('zrem', KEYS[3], ARGV[1]); redis.call('zrem', KEYS[1], ARGV[1]); redis.call('hdel', KEYS[2], ARGV[1]); ", Arrays.asList(getSubscribersName(), getMapName(), getTimeout()), this.subscriberId.getAndSet(null));
    }

    @Override // org.redisson.api.RReliableTopic
    public int countSubscribers() {
        return ((Integer) get(countSubscribersAsync())).intValue();
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Integer> countSubscribersAsync() {
        return this.commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.ZCARD_INT, getSubscribersName());
    }

    private void renewExpiration() {
        this.timeoutTask = this.commandExecutor.getConnectionManager().newTimeout(timeout -> {
            this.commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('zscore', KEYS[1], ARGV[2]) == false then return 0; end; redis.call('zadd', KEYS[1], ARGV[1], ARGV[2]); return 1; ", Arrays.asList(getTimeout()), Long.valueOf(System.currentTimeMillis() + this.commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout()), this.subscriberId.get()).onComplete((bool, th) -> {
                if (th != null) {
                    log.error("Can't update reliable topic " + getRawName() + " expiration time", th);
                } else if (bool.booleanValue()) {
                    renewExpiration();
                }
            });
        }, this.commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout() / 3, TimeUnit.MILLISECONDS);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture remainTimeToLiveAsync() {
        return super.remainTimeToLiveAsync();
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ long remainTimeToLive() {
        return super.remainTimeToLive();
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean clearExpire() {
        return super.clearExpire();
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireAtAsync(Date date) {
        return super.expireAtAsync(date);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireAt(Date date) {
        return super.expireAt(date);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireAsync(Instant instant) {
        return super.expireAsync(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expire(Instant instant) {
        return super.expire(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireAtAsync(long j) {
        return super.expireAtAsync(j);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireAt(long j) {
        return super.expireAt(j);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expire(long j, TimeUnit timeUnit) {
        return super.expire(j, timeUnit);
    }
}
