package com.alibaba.rsqldb.rest.service.iml;

import com.alibaba.rsqldb.common.exception.RSQLClientException;
import com.alibaba.rsqldb.common.exception.RSQLServerException;
import com.alibaba.rsqldb.parser.impl.BuildContext;
import com.alibaba.rsqldb.parser.model.Node;
import com.alibaba.rsqldb.parser.model.statement.Statement;
import com.alibaba.rsqldb.rest.service.Engin;
import com.alibaba.rsqldb.rest.service.RSQLConfig;
import com.alibaba.rsqldb.rest.service.RSQLConfigBuilder;
import com.alibaba.rsqldb.rest.spi.ServiceLoader;
import com.alibaba.rsqldb.storage.api.Command;
import com.alibaba.rsqldb.storage.api.CommandQueue;
import com.alibaba.rsqldb.storage.api.CommandStatus;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.streams.core.RocketMQStream;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/alibaba/rsqldb/rest/service/iml/RSQLEngin.class */
public class RSQLEngin implements Engin {
    private static final Logger logger = LoggerFactory.getLogger(RSQLEngin.class);
    private static final String PRODUCER_GROUP = "RSQL_PRODUCER_GROUP";
    private final RSQLConfig rsqlConfig;
    private final CommandQueue commandQueue;
    private final TaskFactory taskFactory;
    private final BlockingQueue<Runnable> cacheQueue = new LinkedBlockingQueue();
    private AtomicReference<RSQLServerException> holder = new AtomicReference<>();
    private volatile boolean stop = false;
    private HashMap<String, RocketMQStream> rStreams = new HashMap<>();
    private final DefaultMQProducer producer = producer();
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60000L, TimeUnit.MILLISECONDS, this.cacheQueue, (ThreadFactory) new ThreadFactoryImpl("RSQL_EnginThread_"));

    /* renamed from: com.alibaba.rsqldb.rest.service.iml.RSQLEngin$1, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/rsqldb/rest/service/iml/RSQLEngin$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$rsqldb$storage$api$CommandStatus = new int[CommandStatus.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$rsqldb$storage$api$CommandStatus[CommandStatus.RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$rsqldb$storage$api$CommandStatus[CommandStatus.STOPPED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$rsqldb$storage$api$CommandStatus[CommandStatus.REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public RSQLEngin(RSQLConfigBuilder rSQLConfigBuilder, TaskFactory taskFactory, ServiceLoader serviceLoader) {
        this.rsqlConfig = rSQLConfigBuilder.build();
        this.taskFactory = taskFactory;
        try {
            this.commandQueue = (CommandQueue) serviceLoader.load(CommandQueue.class, this.rsqlConfig.getStorage());
            this.commandQueue.start(rSQLConfigBuilder.getProperties());
            try {
                this.commandQueue.restore().thenAcceptAsync(bool -> {
                    if (bool != null && bool.booleanValue()) {
                        start();
                    } else {
                        this.holder.set(new RSQLServerException("can not start engin, because restore failed."));
                    }
                });
            } finally {
                RSQLServerException rSQLServerException = new RSQLServerException(th);
            }
        } catch (Exception th) {
            logger.error("load storage module error.", th);
            throw new RSQLServerException(th);
        }
    }

    @Override // com.alibaba.rsqldb.rest.service.Engin
    public void start() {
        TaskFactory taskFactory = this.taskFactory;
        CommandQueue commandQueue = this.commandQueue;
        commandQueue.getClass();
        taskFactory.init(commandQueue::findTable);
        try {
            this.producer.start();
            this.executor.submit(this::runInLoop);
            logger.info("start engin success!");
        } catch (MQClientException e) {
            throw new RSQLServerException(e);
        }
    }

    private void validate() {
        if (this.holder.get() != null) {
            throw this.holder.get();
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:9:0x0075. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:22:0x0124 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:32:0x0000 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void runInLoop() {
        /*
            Method dump skipped, instructions count: 433
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.alibaba.rsqldb.rest.service.iml.RSQLEngin.runInLoop():void");
    }

    private void startStream(Command command) throws Throwable {
        String jobId = command.getJobId();
        Statement node = command.getNode();
        BuildContext buildContext = new BuildContext(this.producer, jobId);
        logger.info("【prepare stream task】, with jobId={}, command={}", jobId, node.getContent());
        BuildContext dispatch = this.taskFactory.dispatch(node, buildContext);
        if (dispatch == null) {
            logger.info("non-runnable task, command:[{}]", command);
            return;
        }
        TopologyBuilder build = dispatch.getStreamBuilder().build();
        Properties properties = new Properties();
        properties.put("rocketmq.namesrv.addr", this.rsqlConfig.getNamesrvAddr());
        properties.putAll(dispatch.getConfigSetAtBuild());
        RocketMQStream rocketMQStream = new RocketMQStream(build, properties);
        RocketMQStream put = this.rStreams.put(jobId, rocketMQStream);
        if (put != null) {
            logger.warn("jobId replaced, jobId=[{}], new sql content=[{}]", jobId, node.getContent());
            put.stop();
        }
        rocketMQStream.start();
        logger.info("【start stream task】, with jobId:[{}] and sql content=[{}]", jobId, node.getContent());
    }

    @Override // com.alibaba.rsqldb.rest.service.Engin
    public CompletableFuture<Throwable> putCommand(String str, Node node, boolean z) throws Throwable {
        validate();
        return this.commandQueue.putCommand(z ? new Command(str, node, CommandStatus.RUNNING) : new Command(str, node, CommandStatus.STOPPED));
    }

    @Override // com.alibaba.rsqldb.rest.service.Engin
    public List<Command> queryAll() {
        validate();
        return this.commandQueue.queryStatus();
    }

    @Override // com.alibaba.rsqldb.rest.service.Engin
    public Command queryByJobId(String str) {
        validate();
        return this.commandQueue.queryStatus(str);
    }

    @Override // com.alibaba.rsqldb.rest.service.Engin
    public void terminate(String str) throws Throwable {
        validate();
        Command queryByJobId = queryByJobId(str);
        if (queryByJobId == null) {
            String format = String.format("the command is empty corresponding to jobId: %s", str);
            logger.error(format);
            throw new RSQLClientException(format);
        }
        if (queryByJobId.getStatus() != CommandStatus.STOPPED) {
            wait4Finish(this.commandQueue.putCommand(new Command(str, queryByJobId.getNode(), CommandStatus.STOPPED)));
        } else {
            String format2 = String.format("jobId=[%s] is terminated, does not need terminated.", str);
            logger.error(format2);
            throw new RSQLClientException(format2);
        }
    }

    @Override // com.alibaba.rsqldb.rest.service.Engin
    public void restart(String str) throws Throwable {
        validate();
        Command queryByJobId = queryByJobId(str);
        if (queryByJobId == null) {
            String format = String.format("the command is empty corresponding to jobId: %s", str);
            logger.error(format);
            throw new RSQLClientException(format);
        }
        if (queryByJobId.getStatus() != CommandStatus.RUNNING) {
            wait4Finish(this.commandQueue.putCommand(new Command(str, queryByJobId.getNode(), CommandStatus.RUNNING)));
        } else {
            String format2 = String.format("jobId=[%s] is running, does not need restart.", str);
            logger.error(format2);
            throw new RSQLClientException(format2);
        }
    }

    @Override // com.alibaba.rsqldb.rest.service.Engin
    public void remove(String str) throws Throwable {
        validate();
        Command queryByJobId = queryByJobId(str);
        if (queryByJobId == null) {
            String format = String.format("the command is empty corresponding to jobId: %s", str);
            logger.error(format);
            throw new RSQLClientException(format);
        }
        if (queryByJobId.getStatus() != CommandStatus.RUNNING) {
            wait4Finish(this.commandQueue.delete(str));
        } else {
            String format2 = String.format("jobId=[%s] is running, terminate it first.", str);
            logger.error(format2);
            throw new RSQLClientException(format2);
        }
    }

    @PreDestroy
    public void shutdown() {
        this.stop = true;
        try {
            this.commandQueue.close();
        } catch (Exception e) {
        }
        Iterator<RocketMQStream> it = this.rStreams.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.producer.shutdown();
        this.executor.shutdownNow();
    }

    private DefaultMQProducer producer() {
        String namesrvAddr = this.rsqlConfig.getNamesrvAddr();
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(PRODUCER_GROUP);
        defaultMQProducer.setNamesrvAddr(namesrvAddr);
        return defaultMQProducer;
    }

    private void wait4Finish(CompletableFuture<Throwable> completableFuture) {
        try {
            Throwable th = completableFuture.get(10L, TimeUnit.SECONDS);
            if (th != null) {
                throw th;
            }
        } catch (Throwable th2) {
            throw new RSQLServerException(th2);
        }
    }
}
