package com.aliyun.openservices.ons.api.impl.rocketmq;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Constants;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:com/aliyun/openservices/ons/api/impl/rocketmq/ConsumerImpl.class */
public class ConsumerImpl extends ONSConsumerAbstract implements Consumer {
    private final ConcurrentHashMap<String, MessageListener> subscribeTable;

    /* loaded from: input_file:com/aliyun/openservices/ons/api/impl/rocketmq/ConsumerImpl$MessageListenerImpl.class */
    class MessageListenerImpl implements MessageListenerConcurrently {
        MessageListenerImpl() {
        }

        @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            Message msgConvert = ONSUtil.msgConvert(messageExt);
            Map<String, String> properties = messageExt.getProperties();
            msgConvert.setMsgID(messageExt.getMsgId());
            if (properties != null && properties.get(Constants.TRANSACTION_ID) != null) {
                msgConvert.setMsgID(properties.get(Constants.TRANSACTION_ID));
            }
            MessageListener messageListener = (MessageListener) ConsumerImpl.this.subscribeTable.get(msgConvert.getTopic());
            if (null == messageListener) {
                throw new ONSClientException("MessageListener is null");
            }
            Action consume = messageListener.consume(msgConvert, new ConsumeContext());
            if (consume == null) {
                return null;
            }
            switch (consume) {
                case CommitMessage:
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                case ReconsumeLater:
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                default:
                    return null;
            }
        }
    }

    public ConsumerImpl(Properties properties) {
        super(properties);
        this.subscribeTable = new ConcurrentHashMap<>();
        this.defaultMQPushConsumer.setPostSubscriptionWhenPull(Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false")));
        this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING)));
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSConsumerAbstract, com.aliyun.openservices.ons.api.Admin
    public void start() {
        this.defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) new MessageListenerImpl());
        super.start();
    }

    @Override // com.aliyun.openservices.ons.api.Consumer
    public void subscribe(String str, String str2, MessageListener messageListener) {
        if (null == str) {
            throw new ONSClientException("topic is null");
        }
        if (null == messageListener) {
            throw new ONSClientException("listener is null");
        }
        this.subscribeTable.put(str, messageListener);
        super.subscribe(str, str2);
    }

    @Override // com.aliyun.openservices.ons.api.impl.rocketmq.ONSConsumerAbstract, com.aliyun.openservices.ons.api.batch.BatchConsumer
    public void unsubscribe(String str) {
        if (null != str) {
            this.subscribeTable.remove(str);
            super.unsubscribe(str);
        }
    }
}
