package com.alibaba.rsqldb.storage.rocketmq;

import com.alibaba.rsqldb.common.RSQLConstant;
import com.alibaba.rsqldb.common.exception.DeserializeException;
import com.alibaba.rsqldb.common.exception.RSQLServerException;
import com.alibaba.rsqldb.parser.model.Node;
import com.alibaba.rsqldb.parser.model.statement.CreateTableStatement;
import com.alibaba.rsqldb.parser.model.statement.CreateViewStatement;
import com.alibaba.rsqldb.parser.model.statement.Statement;
import com.alibaba.rsqldb.storage.api.Command;
import com.alibaba.rsqldb.storage.api.CommandQueue;
import com.alibaba.rsqldb.storage.api.CommandSerDe;
import com.alibaba.rsqldb.storage.api.CommandStatus;
import com.alibaba.rsqldb.storage.api.CommandWrapper;
import com.alibaba.rsqldb.storage.api.serialize.DefaultCommandSerDe;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.streams.core.running.RocketMQClient;
import org.apache.rocketmq.streams.core.util.RocketMQUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/rsqldb/storage/rocketmq/RocketMQStorage.class */
public class RocketMQStorage implements CommandQueue {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQStorage.class);
    private RocketMQClient rocketMQClient;
    private String topicName;
    private DefaultLitePullConsumer pullConsumer;
    private DefaultMQProducer producer;
    private DefaultMQAdminExt mqAdmin;
    private Collection<MessageQueue> commandMessageQueue;
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final CommandSerDe commandSerDe = new DefaultCommandSerDe();
    private final ConcurrentHashMap<String, Statement> tableCache = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, CompletableFuture<Throwable>> preCommandMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Command> commandMap = new ConcurrentHashMap<>();
    private final LinkedList<Command> restoreCommand = new LinkedList<>();
    private final LinkedList<MessageExt> cache = new LinkedList<>();

    public void start(Properties properties) {
        String property = properties.getProperty("namesrvAddr");
        if (StringUtils.isBlank(property)) {
            throw new RSQLServerException("namesrv can not be blank.");
        }
        String str = RSQLConstant.RocketMQ.SQL_GROUP_NAME;
        if (StringUtils.isBlank(str)) {
            throw new RSQLServerException("groupName can not be blank.");
        }
        this.topicName = RSQLConstant.RocketMQ.SQL_TOPIC_NAME;
        if (StringUtils.isBlank(this.topicName)) {
            throw new RSQLServerException("topicName can not be blank.");
        }
        build(property, str);
        try {
            RocketMQUtil.createStaticCompactTopic(this.mqAdmin, this.topicName, 1, (Set) null);
            this.pullConsumer.start();
            this.producer.start();
            Collection<MessageQueue> fetchMessageQueues = this.pullConsumer.fetchMessageQueues(this.topicName);
            if (fetchMessageQueues == null || fetchMessageQueues.size() != 1) {
                throw new RSQLServerException("command topic queue not equals 1. messageQueue=" + fetchMessageQueues);
            }
            this.commandMessageQueue = fetchMessageQueues;
        } catch (Exception e) {
            throw new RSQLServerException("start localStore error.", e);
        }
    }

    public void build(String str, String str2) {
        this.rocketMQClient = new RocketMQClient(str);
        this.pullConsumer = new DefaultLitePullConsumer(str2);
        this.pullConsumer.setNamesrvAddr(str);
        this.pullConsumer.setMessageModel(MessageModel.BROADCASTING);
        this.pullConsumer.setAutoCommit(false);
        this.producer = this.rocketMQClient.producer(str2);
        try {
            this.mqAdmin = this.rocketMQClient.getMQAdmin();
        } catch (MQClientException e) {
            throw new RSQLServerException(e);
        }
    }

    public CompletableFuture<Boolean> restore() throws Throwable {
        this.pullConsumer.setPullBatchSize(1000);
        this.pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        this.pullConsumer.assign(this.commandMessageQueue);
        this.pullConsumer.seekToBegin((MessageQueue) this.commandMessageQueue.toArray()[0]);
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        this.executor.submit(() -> {
            try {
                pullToLast();
                commit();
                this.pullConsumer.setPullBatchSize(1);
                completableFuture.complete(true);
            } catch (Throwable th) {
                completableFuture.complete(false);
                logger.error("pull to last error.", th);
                throw new RSQLServerException(th);
            }
        });
        return completableFuture;
    }

    public CompletableFuture<Throwable> putCommand(Command command) throws Throwable {
        if (checkExist(command)) {
            CompletableFuture<Throwable> completableFuture = new CompletableFuture<>();
            completableFuture.complete(null);
            return completableFuture;
        }
        byte[] serialize = this.commandSerDe.serialize(command);
        String jobId = command.getJobId();
        CompletableFuture<Throwable> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<Throwable> put = this.preCommandMap.put(jobId, completableFuture2);
        if (put != null) {
            logger.warn("find uncompleted completableFuture, completed it.");
            put.complete(null);
        }
        try {
            Message message = new Message(this.topicName, serialize);
            message.setKeys(jobId);
            message.putUserProperty("body.type", command.getClass().getName());
            Logger logger2 = logger;
            Object[] objArr = new Object[4];
            objArr[0] = this.topicName;
            objArr[1] = jobId;
            objArr[2] = command.getNode() == null ? null : command.getNode().getContent();
            objArr[3] = command.getStatus();
            logger2.info("put statement into rocketmq command topic:{} with jobId:[{}], command:[{}], status:[{}]", objArr);
            this.producer.send(message);
            return completableFuture2;
        } catch (Throwable th) {
            throw new RSQLServerException("put sql to command topic error.", th);
        }
    }

    private boolean checkExist(Command command) throws Throwable {
        String jobId = command.getJobId();
        Command command2 = this.commandMap.get(jobId);
        if (command2 != null && command2.getStatus() == command.getStatus()) {
            String format = String.format("exist a command has same jobId and status in commandMap, not executed yet, exist command:[%s].", command2);
            logger.error(format);
            throw new RSQLServerException(format);
        }
        if (this.preCommandMap.get(jobId) == null) {
            return false;
        }
        String format2 = String.format("exist a command with same jobId, not executed yet, exist command:[%s].", command2);
        logger.error(format2);
        throw new RSQLServerException(format2);
    }

    public CommandWrapper getNextCommand() throws Throwable {
        if (this.restoreCommand.size() != 0) {
            Command pop = this.restoreCommand.pop();
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(null);
            return new CommandWrapper(pop, new RocketMQCallBack(completableFuture, this::commitStatus));
        }
        MessageExt pollFromStore = pollFromStore();
        if (pollFromStore == null) {
            return null;
        }
        Command deserializeAndSaveTable = deserializeAndSaveTable(pollFromStore);
        String jobId = deserializeAndSaveTable.getJobId();
        CompletableFuture<Throwable> remove = this.preCommandMap.remove(jobId);
        if (remove == null) {
            logger.info("CompletableFuture is empty in local, the command maybe submit in other RSQLDB instance, jobId:{}", jobId);
            Iterator it = this.preCommandMap.keySet().iterator();
            while (it.hasNext()) {
                logger.info("jobId in preCommandMap: {}", (String) it.next());
            }
            remove = new CompletableFuture<>();
        }
        return new CommandWrapper(deserializeAndSaveTable, new RocketMQCallBack(remove, this::commitStatus, this::commit));
    }

    private MessageExt pollFromStore() {
        MessageExt messageExt;
        if (this.cache.size() != 0) {
            messageExt = this.cache.pop();
        } else {
            List poll = this.pullConsumer.poll(10L);
            if (poll == null || poll.size() == 0) {
                return null;
            }
            if (poll.size() > 1) {
                this.pullConsumer.setPullBatchSize(1);
                logger.info("batch message num greater than 1, cache it first.");
                this.cache.addAll(poll);
                messageExt = this.cache.pop();
            } else {
                messageExt = (MessageExt) poll.get(0);
            }
        }
        return messageExt;
    }

    private void commitStatus(String str, Command command) {
        if (command.getStatus() != CommandStatus.REMOVED) {
            Command put = this.commandMap.put(str, command);
            if (put != null) {
                logger.info("change command, jobId:{}, status from:[{}] to:[{}]", new Object[]{str, put.getStatus(), command.getStatus()});
                return;
            }
            return;
        }
        Command remove = this.commandMap.remove(str);
        if (remove != null) {
            logger.info("remove command from cache, command:[{}]", remove);
            Statement node = remove.getNode();
            if ((node instanceof CreateTableStatement) || (node instanceof CreateViewStatement)) {
                String tableName = node.getTableName();
                if (this.tableCache.remove(tableName) != null) {
                    logger.warn("remove table from cache. tableName:{}", tableName);
                }
            }
        }
    }

    public Statement findTable(String str) {
        if (this.tableCache.containsKey(str)) {
            return this.tableCache.get(str);
        }
        throw new RSQLServerException("Statement with tableName=" + str + " not exist.");
    }

    public Command queryStatus(String str) {
        Command command = this.commandMap.get(str);
        if (command != null) {
            return command;
        }
        CompletableFuture<Throwable> completableFuture = this.preCommandMap.get(str);
        if (completableFuture != null) {
            try {
                completableFuture.get(10L, TimeUnit.SECONDS);
            } catch (Throwable th) {
                return null;
            }
        }
        return null;
    }

    public List<Command> queryStatus() {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.commandMap.keySet().iterator();
        while (it.hasNext()) {
            arrayList.add(this.commandMap.get((String) it.next()));
        }
        return arrayList;
    }

    public CompletableFuture<Throwable> delete(String str) throws Throwable {
        return putCommand(new Command(str, (Node) null, CommandStatus.REMOVED));
    }

    public void close() throws Exception {
        this.producer.shutdown();
        this.pullConsumer.shutdown();
        this.mqAdmin.shutdown();
        this.executor.shutdown();
    }

    private void commit() {
        this.pullConsumer.commit(new HashSet(this.commandMessageQueue), true);
    }

    private void pullToLast() throws DeserializeException {
        List<MessageExt> arrayList = new ArrayList<>();
        List poll = this.pullConsumer.poll(100L);
        while (true) {
            List list = poll;
            if (list == null || list.size() == 0) {
                break;
            }
            arrayList.addAll(list);
            if (arrayList.size() <= 1000) {
                poll = this.pullConsumer.poll(100L);
            } else {
                replayState(arrayList);
                arrayList.clear();
                poll = this.pullConsumer.poll(100L);
            }
        }
        if (arrayList.size() != 0) {
            replayState(arrayList);
        }
    }

    private void replayState(List<MessageExt> list) throws DeserializeException {
        if (list == null || list.size() == 0) {
            return;
        }
        Map map = (Map) ((Stream) list.stream().parallel()).collect(Collectors.groupingBy((v0) -> {
            return v0.getKeys();
        }));
        Iterator it = map.keySet().iterator();
        while (it.hasNext()) {
            List<MessageExt> sortByQueueOffset = sortByQueueOffset((List) map.get((String) it.next()));
            MessageExt messageExt = sortByQueueOffset.get(sortByQueueOffset.size() - 1);
            if (!"true".equals(messageExt.getUserProperty("empty_body"))) {
                this.restoreCommand.add(deserializeAndSaveTable(messageExt));
            }
        }
    }

    private Command deserializeAndSaveTable(MessageExt messageExt) throws DeserializeException {
        String userProperty = messageExt.getUserProperty("body.type");
        if (!Command.class.getName().equals(userProperty)) {
            throw new DeserializeException("unknown class name: " + userProperty);
        }
        Command deserialize = this.commandSerDe.deserialize(messageExt.getBody());
        Statement node = deserialize.getNode();
        if ((node instanceof CreateTableStatement) || (node instanceof CreateViewStatement)) {
            Statement statement = node;
            this.tableCache.put(statement.getTableName(), statement);
        }
        return deserialize;
    }

    private List<MessageExt> sortByQueueOffset(List<MessageExt> list) {
        if (list == null || list.size() == 0) {
            return new ArrayList();
        }
        list.sort((messageExt, messageExt2) -> {
            long queueOffset = messageExt.getQueueOffset() - messageExt2.getQueueOffset();
            if (queueOffset > 0) {
                return 1;
            }
            return queueOffset < 0 ? -1 : 0;
        });
        return list;
    }
}
