package com.leng.project.redisqueue.handler;

import com.leng.project.redisqueue.RedisQueueTemplate;
import com.leng.project.redisqueue.annotation.RedisDelayQueueListener;
import com.leng.project.redisqueue.annotation.RedisQueueListener;
import com.leng.project.redisqueue.bean.Annotation;
import com.leng.project.redisqueue.bean.Message;
import java.lang.reflect.Method;
import java.util.List;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.RedisConnectionFailureException;

/* loaded from: input_file:com/leng/project/redisqueue/handler/QueueHandler.class */
public class QueueHandler {
    private static final Logger log = LoggerFactory.getLogger(QueueHandler.class);

    @Autowired
    private RedisQueueTemplate redisQueueTemplate;

    public void registerDelayListener(RedisDelayQueueListener redisDelayQueueListener, Object obj, Method method) {
        Annotation annotation = new Annotation();
        annotation.setQueue(redisDelayQueueListener.queue());
        annotation.setConsumers(redisDelayQueueListener.consumers());
        annotation.setAutoAck(redisDelayQueueListener.autoAck());
        annotation.setPrefetch(redisDelayQueueListener.prefetch());
        annotation.setFrequency(redisDelayQueueListener.frequency());
        annotation.setClazz(method.getParameterTypes()[0]);
        annotation.setQueueType(2);
        registerListener0(annotation, obj, method, () -> {
            return this.redisQueueTemplate.takeDelayMessage(annotation);
        });
    }

    public void registerListener(RedisQueueListener redisQueueListener, Object obj, Method method) {
        Annotation annotation = new Annotation();
        annotation.setQueue(redisQueueListener.queue());
        annotation.setConsumers(redisQueueListener.consumers());
        annotation.setAutoAck(redisQueueListener.autoAck());
        annotation.setPrefetch(redisQueueListener.prefetch());
        annotation.setFrequency(redisQueueListener.frequency());
        annotation.setClazz(method.getParameterTypes()[0]);
        annotation.setQueueType(1);
        registerListener0(annotation, obj, method, () -> {
            return this.redisQueueTemplate.takeMessage(annotation);
        });
    }

    private void registerListener0(Annotation annotation, Object obj, Method method, Supplier<List<Message<?>>> supplier) {
        this.redisQueueTemplate.registerQueue(annotation.getQueue(), annotation.getQueueType());
        for (int i = 1; i <= Math.max(1, annotation.getConsumers()); i++) {
            new Thread(() -> {
                while (true) {
                    try {
                        List<Message> list = (List) supplier.get();
                        if (log.isDebugEnabled()) {
                            log.debug("当前消费线程：queue={}, id={}", annotation.getQueue(), Long.valueOf(Thread.currentThread().getId()));
                        }
                        for (Message message : list) {
                            method.invoke(obj, message.getData());
                            if (!annotation.isAutoAck()) {
                                this.redisQueueTemplate.ack(message);
                            }
                        }
                    } catch (RedisConnectionFailureException e) {
                        log.warn(e.getMessage());
                    } catch (Exception e2) {
                        log.error("队列消费出错：queue={}, message={}", new Object[]{annotation.getQueue(), null, e2});
                    }
                }
            }).start();
        }
    }
}
