package com.leng.project.redisqueue;

import com.alibaba.fastjson.JSONObject;
import com.leng.project.redisqueue.annotation.RedisSubscribeListener;
import com.leng.project.redisqueue.bean.Annotation;
import com.leng.project.redisqueue.bean.DelayMessageParam;
import com.leng.project.redisqueue.bean.Message;
import com.leng.project.redisqueue.exception.RedisDistributedLockException;
import com.leng.project.redisqueue.lock.RedisDistributedLock;
import com.leng.project.redisqueue.utils.StringUtils;
import jakarta.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:com/leng/project/redisqueue/RedisQueueTemplate.class */
public class RedisQueueTemplate {
    private static final Logger log = LoggerFactory.getLogger(RedisQueueTemplate.class);

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private RedisDistributedLock distributedLock;
    private Map<String, Integer> allQueueMap = null;

    @PostConstruct
    private void init() {
        this.allQueueMap = new HashMap();
        Set members = this.redisTemplate.opsForSet().members(Constant.getAllQueueKey());
        if (members == null || members.isEmpty()) {
            return;
        }
        members.forEach(str -> {
            String[] split = str.split(";");
            this.allQueueMap.put(split[0] + ";" + split[1], Integer.valueOf(split[2]));
        });
    }

    @RedisSubscribeListener(channel = Constant.REFRESH_ALL_QUEUE_CACHE_CHANNEL)
    public void refreshAllQueueMap() {
        init();
    }

    public <T> void sendDelayMessage(final String str, final T t, final long j, final TimeUnit timeUnit) {
        registerQueue(str, 2);
        final long epochSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).getEpochSecond();
        final String uuid = StringUtils.getUUID();
        this.redisTemplate.execute(new SessionCallback() { // from class: com.leng.project.redisqueue.RedisQueueTemplate.1
            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                redisOperations.multi();
                redisOperations.opsForHash().put(Constant.getQueueDataKey(str), uuid, JSONObject.toJSONString(t));
                redisOperations.opsForZSet().add(Constant.getQueueKey(str), uuid, epochSecond + timeUnit.toSeconds(j));
                redisOperations.exec();
                return null;
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("发送延时队列消息：队列={}，消息={}，延迟时间={}", new Object[]{str, JSONObject.toJSONString(t), timeUnit.toSeconds(j) + "秒"});
        }
    }

    public <T> void sendDelayMessageAll(final String str, List<T> list, long j, TimeUnit timeUnit) {
        Assert.notEmpty(list, "消息不能为空");
        registerQueue(str, 2);
        Double valueOf = Double.valueOf(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).getEpochSecond() + timeUnit.toSeconds(j));
        Object[] objArr = new Object[list.size()];
        final HashMap hashMap = new HashMap();
        final HashSet hashSet = new HashSet();
        int i = 0;
        for (T t : list) {
            String uuid = StringUtils.getUUID();
            hashMap.put(uuid, JSONObject.toJSONString(t));
            objArr[i] = uuid;
            hashSet.add(new DefaultTypedTuple(uuid, valueOf));
            i++;
        }
        this.redisTemplate.execute(new SessionCallback() { // from class: com.leng.project.redisqueue.RedisQueueTemplate.2
            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                redisOperations.multi();
                redisOperations.opsForHash().putAll(Constant.getQueueDataKey(str), hashMap);
                redisOperations.opsForZSet().add(Constant.getQueueKey(str), hashSet);
                redisOperations.exec();
                return null;
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("发送延时队列消息：队列={}，消息={}，延迟时间={}", new Object[]{str, JSONObject.toJSONString(list), timeUnit.toSeconds(j) + "秒"});
        }
    }

    public void sendDelayMessageAll(final String str, List<DelayMessageParam> list) {
        Assert.notEmpty(list, "消息不能为空");
        registerQueue(str, 2);
        long epochSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).getEpochSecond();
        Object[] objArr = new Object[list.size()];
        final HashMap hashMap = new HashMap();
        final HashSet hashSet = new HashSet();
        int i = 0;
        for (DelayMessageParam delayMessageParam : list) {
            String uuid = StringUtils.getUUID();
            hashMap.put(uuid, JSONObject.toJSONString(delayMessageParam.getData()));
            objArr[i] = uuid;
            hashSet.add(new DefaultTypedTuple(uuid, Double.valueOf(epochSecond + delayMessageParam.getTimeUnit().toSeconds(delayMessageParam.getDelay()))));
            i++;
        }
        this.redisTemplate.execute(new SessionCallback() { // from class: com.leng.project.redisqueue.RedisQueueTemplate.3
            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                redisOperations.multi();
                redisOperations.opsForHash().putAll(Constant.getQueueDataKey(str), hashMap);
                redisOperations.opsForZSet().add(Constant.getQueueKey(str), hashSet);
                redisOperations.exec();
                return null;
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("发送延时队列消息：队列={}，消息={}", str, JSONObject.toJSONString(list));
        }
    }

    public <T> void sendMessage(final String str, final T t) {
        registerQueue(str, 1);
        final String uuid = StringUtils.getUUID();
        this.redisTemplate.execute(new SessionCallback() { // from class: com.leng.project.redisqueue.RedisQueueTemplate.4
            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                redisOperations.multi();
                redisOperations.opsForHash().put(Constant.getQueueDataKey(str), uuid, JSONObject.toJSONString(t));
                redisOperations.opsForList().rightPush(Constant.getQueueKey(str), uuid);
                redisOperations.exec();
                return null;
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("发送队列消息：队列={}，消息={}", str, JSONObject.toJSONString(t));
        }
    }

    public <T> void sendMessageAll(final String str, List<T> list) {
        Assert.notEmpty(list, "消息不能为空");
        registerQueue(str, 1);
        final Object[] objArr = new Object[list.size()];
        final HashMap hashMap = new HashMap();
        int i = 0;
        for (T t : list) {
            String uuid = StringUtils.getUUID();
            hashMap.put(uuid, JSONObject.toJSONString(t));
            objArr[i] = uuid;
            i++;
        }
        this.redisTemplate.execute(new SessionCallback() { // from class: com.leng.project.redisqueue.RedisQueueTemplate.5
            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                redisOperations.multi();
                redisOperations.opsForHash().putAll(Constant.getQueueDataKey(str), hashMap);
                redisOperations.opsForList().rightPushAll(Constant.getQueueKey(str), objArr);
                redisOperations.exec();
                return null;
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("发送队列消息：队列={}，消息={}", str, JSONObject.toJSONString(list));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getEpochSecond() {
        return LocalDateTime.now().toInstant(ZoneOffset.of("+8")).getEpochSecond();
    }

    public List<Message<?>> takeDelayMessage(Annotation annotation) {
        String queueKey = Constant.getQueueKey(annotation.getQueue());
        return takeMessage0(annotation, () -> {
            return this.redisTemplate.opsForZSet().rangeByScore(queueKey, 0.0d, getEpochSecond(), 0L, Math.max(annotation.getPrefetch(), 1));
        }, (collection, map) -> {
            return (List) this.redisTemplate.execute(new SessionCallback<List<Message<?>>>() { // from class: com.leng.project.redisqueue.RedisQueueTemplate.6
                /* renamed from: execute, reason: merged with bridge method [inline-methods] */
                public List<Message<?>> m3execute(RedisOperations redisOperations) throws DataAccessException {
                    redisOperations.multi();
                    redisOperations.opsForZSet().remove(queueKey, collection.toArray(new String[collection.size()]));
                    Stream stream = collection.stream();
                    Map map = map;
                    Annotation annotation2 = annotation;
                    List<Message<?>> list = (List) stream.map(str -> {
                        Message message = (Message) map.get(str);
                        if (message == null) {
                            RedisQueueTemplate.log.warn("消息数据为空：messageId={}", str);
                            return null;
                        }
                        message.setType(2);
                        if (annotation2.isAutoAck()) {
                            RedisQueueTemplate.this.redisTemplate.opsForHash().delete(Constant.getQueueDataKey(annotation2.getQueue()), new Object[]{str});
                        } else {
                            redisOperations.opsForZSet().add(Constant.getAckQueueKey(), RedisQueueTemplate.this.buildUnAckMessage(str, annotation2.getQueue(), 2), RedisQueueTemplate.this.getEpochSecond());
                        }
                        return message;
                    }).filter(message -> {
                        return message != null;
                    }).collect(Collectors.toList());
                    redisOperations.exec();
                    return list;
                }
            });
        });
    }

    private void unAckMessageRequeue0(Annotation annotation) {
        String ackQueueKey = Constant.getAckQueueKey();
        takeMessage0(annotation, () -> {
            return this.redisTemplate.opsForZSet().rangeByScore(ackQueueKey, 0.0d, getEpochSecond(), 0L, Math.max(annotation.getPrefetch(), 1));
        }, (collection, map) -> {
            this.redisTemplate.execute(new SessionCallback<Void>() { // from class: com.leng.project.redisqueue.RedisQueueTemplate.7
                /* renamed from: execute, reason: merged with bridge method [inline-methods] */
                public Void m4execute(RedisOperations redisOperations) throws DataAccessException {
                    redisOperations.multi();
                    redisOperations.opsForZSet().remove(ackQueueKey, collection.toArray(new String[collection.size()]));
                    collection.stream().forEach(str -> {
                        String[] split = str.split(";");
                        String str = split[0];
                        String queueKey = Constant.getQueueKey(split[1]);
                        int intValue = Integer.valueOf(split[2]).intValue();
                        if (1 == intValue) {
                            redisOperations.opsForList().leftPush(queueKey, str);
                        } else if (2 == intValue) {
                            redisOperations.opsForZSet().add(queueKey, str, RedisQueueTemplate.this.getEpochSecond());
                        } else {
                            RedisQueueTemplate.log.warn("不支持的待确认消息类型：{}", Integer.valueOf(intValue));
                        }
                    });
                    redisOperations.exec();
                    return null;
                }
            });
            return null;
        });
    }

    public void unAckMessageRequeue(int i) {
        Annotation annotation = new Annotation();
        annotation.setConsumers(i);
        annotation.setQueue(Constant.getAckQueueKey());
        annotation.setPrefetch(200);
        annotation.setFrequency(900L);
        for (int i2 = 1; i2 <= Math.max(1, annotation.getConsumers()); i2++) {
            new Thread(() -> {
                while (true) {
                    try {
                        unAckMessageRequeue0(annotation);
                    } catch (Exception e) {
                        log.error("待确认消息重新入队出错：{}", e.getMessage(), e);
                    }
                }
            }).start();
        }
    }

    public List<Message<?>> takeMessage0(Annotation annotation, Supplier<Collection<String>> supplier, BiFunction<Collection<String>, Map<String, Message>, List<Message<?>>> biFunction) {
        String lockKey;
        String tryLock;
        Collection<String> collection;
        while (true) {
            try {
                if (annotation.getFrequency() < 1) {
                    annotation.setFrequency(2L);
                }
                TimeUnit.SECONDS.sleep(annotation.getFrequency());
            } catch (InterruptedException e) {
            }
            Collection<String> collection2 = supplier.get();
            if (collection2 != null && !collection2.isEmpty()) {
                lockKey = Constant.getLockKey(annotation.getQueue());
                try {
                    tryLock = this.distributedLock.tryLock(lockKey, 3000L, 180);
                    try {
                        collection = supplier.get();
                        if (collection != null && !collection.isEmpty()) {
                            break;
                        }
                        this.distributedLock.unlock(lockKey, tryLock);
                    } catch (Throwable th) {
                        this.distributedLock.unlock(lockKey, tryLock);
                        throw th;
                    }
                } catch (RedisDistributedLockException e2) {
                }
            }
        }
        HashMap hashMap = new HashMap();
        if (annotation.getClazz() != null) {
            ArrayList arrayList = new ArrayList(collection);
            List multiGet = this.redisTemplate.opsForHash().multiGet(Constant.getQueueDataKey(annotation.getQueue()), arrayList);
            for (int i = 0; i < arrayList.size(); i++) {
                String str = (String) arrayList.get(i);
                Message message = new Message();
                message.setId(str);
                message.setQueue(annotation.getQueue());
                message.setData(JSONObject.parseObject((String) multiGet.get(i), annotation.getClazz()));
                hashMap.put(str, message);
            }
        }
        List<Message<?>> apply = biFunction.apply(collection, hashMap);
        this.distributedLock.unlock(lockKey, tryLock);
        return apply;
    }

    public List<Message<?>> takeMessage(Annotation annotation) {
        String queueKey = Constant.getQueueKey(annotation.getQueue());
        return takeMessage0(annotation, () -> {
            return this.redisTemplate.opsForList().range(queueKey, 0L, annotation.getPrefetch());
        }, (collection, map) -> {
            return (List) this.redisTemplate.execute(new SessionCallback<List<Message<?>>>() { // from class: com.leng.project.redisqueue.RedisQueueTemplate.8
                /* renamed from: execute, reason: merged with bridge method [inline-methods] */
                public List<Message<?>> m5execute(RedisOperations redisOperations) throws DataAccessException {
                    redisOperations.multi();
                    Stream stream = collection.stream();
                    String str = queueKey;
                    Map map = map;
                    Annotation annotation2 = annotation;
                    List<Message<?>> list = (List) stream.map(str2 -> {
                        redisOperations.opsForList().remove(str, 1L, str2);
                        Message message = (Message) map.get(str2);
                        if (message == null) {
                            RedisQueueTemplate.log.warn("消息数据为空：messageId={}", str2);
                            return null;
                        }
                        message.setType(1);
                        if (annotation2.isAutoAck()) {
                            redisOperations.opsForHash().delete(Constant.getQueueDataKey(annotation2.getQueue()), new Object[]{str2});
                        } else {
                            redisOperations.opsForZSet().add(Constant.getAckQueueKey(), RedisQueueTemplate.this.buildUnAckMessage(str2, annotation2.getQueue(), 1), RedisQueueTemplate.this.getEpochSecond());
                        }
                        return message;
                    }).filter(message -> {
                        return message != null;
                    }).collect(Collectors.toList());
                    redisOperations.exec();
                    return list;
                }
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String buildUnAckMessage(String str, String str2, int i) {
        return str + ";" + str2 + ";" + i;
    }

    public void ack(final Message message) {
        this.redisTemplate.execute(new SessionCallback() { // from class: com.leng.project.redisqueue.RedisQueueTemplate.9
            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                redisOperations.multi();
                redisOperations.opsForZSet().remove(Constant.getAckQueueKey(), new Object[]{RedisQueueTemplate.this.buildUnAckMessage(message.getId(), message.getQueue(), message.getType().intValue())});
                redisOperations.opsForHash().delete(Constant.getQueueDataKey(message.getQueue()), new Object[]{message.getId()});
                redisOperations.exec();
                return null;
            }
        });
    }

    public long queueSize(String str) {
        String queueKey = Constant.getQueueKey(str);
        DataType type = this.redisTemplate.type(queueKey);
        if (type == null) {
            return 0L;
        }
        if (type == DataType.LIST) {
            return this.redisTemplate.opsForList().size(queueKey).longValue();
        }
        if (type == DataType.ZSET) {
            return this.redisTemplate.opsForZSet().size(queueKey).longValue();
        }
        if (type == DataType.SET) {
            return this.redisTemplate.opsForSet().size(queueKey).longValue();
        }
        if (type == DataType.HASH) {
            return this.redisTemplate.opsForHash().size(queueKey).longValue();
        }
        return 0L;
    }

    public void registerQueue(String str, int i) {
        Integer num = this.allQueueMap.get(str + ";" + Constant.getVirtualHost());
        if (num != null) {
            if (num.intValue() != i) {
                throw new RuntimeException("队列类型不匹配");
            }
            return;
        }
        this.redisTemplate.opsForSet().add(Constant.getAllQueueKey(), new String[]{str + ";" + Constant.getVirtualHost() + ";" + i});
        if (num == null) {
            sendChannelMessage(Constant.REFRESH_ALL_QUEUE_CACHE_CHANNEL);
        }
    }

    public <T> void sendChannelMessage(String str) {
        this.redisTemplate.convertAndSend(str, "");
    }

    public <T> void sendChannelMessage(String str, T t) {
        this.redisTemplate.convertAndSend(str, JSONObject.toJSONString(t));
    }
}
