package com.alibaba.ageiport.processor.core.dispatcher.http;

import com.alibaba.ageiport.common.collections.Lists;
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.TaskSpec;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.cluster.Node;
import com.alibaba.ageiport.processor.core.spi.dispatcher.Dispatcher;
import com.alibaba.ageiport.processor.core.spi.dispatcher.RootDispatcherContext;
import com.alibaba.ageiport.processor.core.spi.dispatcher.SubDispatcherContext;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorker;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorkerFactory;
import com.alibaba.ageiport.processor.core.task.monitor.ClearTask;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:BOOT-INF/lib/ageiport-processor-core-0.2.9.jar:com/alibaba/ageiport/processor/core/dispatcher/http/HttpDispatcher.class */
public class HttpDispatcher implements Dispatcher {
    public static final String URL = "/subTasks";
    private static final Logger logger = LoggerFactory.getLogger(HttpDispatcher.class);
    private AgeiPort ageiPort;
    private HttpDispatcherOptions options;
    private Map<String, Node> failedNodeMap = new ConcurrentHashMap();
    private DispatchQueue dispatchQueue = new DispatchQueue();
    private ClearTask clearTask = new ClearTask("HttpDispatcher clear task");
    private int nodeIndex;
    HttpClient httpClient;

    public HttpDispatcher(AgeiPort ageiPort, HttpDispatcherOptions httpDispatcherOptions) {
        this.ageiPort = ageiPort;
        this.options = httpDispatcherOptions;
        HttpDispatcherAgent httpDispatcherAgent = new HttpDispatcherAgent(ageiPort, this);
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(httpDispatcherAgent);
        this.httpClient = vertx.createHttpClient();
        new Thread(() -> {
            while (true) {
                doDispatchInClient();
            }
        }).start();
    }

    private void doDispatchInClient() {
        try {
            SubDispatcherContext subDispatcherContext = this.dispatchQueue.get();
            Node nextNode = getNextNode();
            int i = 0;
            while (this.failedNodeMap.containsKey(nextNode.getId())) {
                nextNode = getNextNode();
                int i2 = i;
                i++;
                if (i2 > this.ageiPort.getClusterManager().getNodes().size() - this.failedNodeMap.size()) {
                    throw new RuntimeException("no ok node");
                }
            }
            dispatchToNode(subDispatcherContext, nextNode);
        } catch (Throwable th) {
            logger.error("doDispatchInClient failed, context:{}", null, th);
        }
    }

    private void dispatchToNode(SubDispatcherContext subDispatcherContext, Node node) {
        HttpDispatchRequest httpDispatchRequest = new HttpDispatchRequest(subDispatcherContext.getMainTaskId(), subDispatcherContext.getSubTaskNos());
        RequestOptions requestOptions = new RequestOptions();
        requestOptions.setHost(node.getHost()).setPort(this.options.getPort()).setMethod(HttpMethod.POST).setURI(URL).setTimeout(3000L);
        this.httpClient.request(requestOptions, asyncResult -> {
            if (asyncResult.succeeded()) {
                ((HttpClientRequest) asyncResult.result()).send(JsonUtil.toJsonString(httpDispatchRequest), asyncResult -> {
                    if (asyncResult.succeeded()) {
                        ((HttpClientResponse) asyncResult.result()).bodyHandler(buffer -> {
                            String buffer = buffer.toString();
                            if (((HttpDispatchResponse) JsonUtil.toObject(buffer, HttpDispatchResponse.class)).getSuccess().booleanValue()) {
                                logger.info("dispatchToNode success, main:{}, ip{}, nos:{}, labels{}", subDispatcherContext.getMainTaskId(), node.getHost(), subDispatcherContext.getSubTaskNos(), subDispatcherContext.getLabels());
                            } else {
                                logger.error("dispatchToNode server response failed, ", buffer);
                                dispatchFailed(node, subDispatcherContext);
                            }
                        });
                    } else {
                        logger.error("dispatchToNode handle response, ", asyncResult.cause());
                        dispatchFailed(node, subDispatcherContext);
                    }
                });
            } else {
                logger.error("dispatchToNode get request failed, ", asyncResult.cause());
                dispatchFailed(node, subDispatcherContext);
            }
        });
    }

    private void dispatchFailed(Node node, SubDispatcherContext subDispatcherContext) {
        logger.error("dispatchFailed, main:{}, ip:{}, nos:{}, labels:{}", subDispatcherContext.getMainTaskId(), node.getHost(), subDispatcherContext.getSubTaskNos(), subDispatcherContext.getLabels());
        this.failedNodeMap.put(node.getId(), node);
        this.clearTask.addClearTask(node.getId(), this.options.getNodeFallbackMs().intValue(), () -> {
            this.failedNodeMap.remove(node.getId());
        });
        this.dispatchQueue.add(subDispatcherContext);
    }

    private Node getNextNode() {
        List<Node> nodes = this.ageiPort.getClusterManager().getNodes();
        int size = nodes.size();
        Node node = nodes.get(this.nodeIndex % size);
        this.nodeIndex = (this.nodeIndex + 1) % size;
        return node;
    }

    @Override // com.alibaba.ageiport.processor.core.spi.dispatcher.Dispatcher
    public void dispatchMainTaskPrepare(RootDispatcherContext rootDispatcherContext) {
        MainTask mainTask = this.ageiPort.getTaskServerClient().getMainTask(rootDispatcherContext.getMainTaskId());
        MainTaskWorker create = ((MainTaskWorkerFactory) this.ageiPort.getTaskSpiSelector().selectExtension(this.ageiPort.getSpecificationRegistry().get(mainTask.getCode()).getExecuteType(), mainTask.getType(), mainTask.getCode(), MainTaskWorkerFactory.class)).create(this.ageiPort, mainTask);
        create.isReduce(false);
        this.ageiPort.getMainWorkerExecutor().submit(create);
    }

    @Override // com.alibaba.ageiport.processor.core.spi.dispatcher.Dispatcher
    public void dispatchSubTasks(SubDispatcherContext subDispatcherContext) {
        for (List<Integer> list : Lists.averageAssign(subDispatcherContext.getSubTaskNos(), this.ageiPort.getClusterManager().getNodes().size())) {
            SubDispatcherContext subDispatcherContext2 = new SubDispatcherContext();
            subDispatcherContext2.setMainTaskId(subDispatcherContext.getMainTaskId());
            subDispatcherContext2.setSubTaskNos(list);
            subDispatcherContext2.setLabels(subDispatcherContext.getLabels());
            this.dispatchQueue.add(subDispatcherContext2);
        }
    }

    @Override // com.alibaba.ageiport.processor.core.spi.dispatcher.Dispatcher
    public void dispatchMainTaskReduce(RootDispatcherContext rootDispatcherContext) {
        MainTask mainTask = this.ageiPort.getTaskServerClient().getMainTask(rootDispatcherContext.getMainTaskId());
        TaskSpec taskSpec = this.ageiPort.getSpecificationRegistry().get(mainTask.getCode());
        MainTaskWorker create = ((MainTaskWorkerFactory) this.ageiPort.getTaskSpiSelector().selectExtension(taskSpec.getExecuteType(), taskSpec.getTaskType(), mainTask.getCode(), MainTaskWorkerFactory.class)).create(this.ageiPort, mainTask);
        create.isReduce(true);
        this.ageiPort.getMainWorkerExecutor().submit(create);
    }

    public AgeiPort getAgeiPort() {
        return this.ageiPort;
    }

    public HttpDispatcherOptions getOptions() {
        return this.options;
    }

    public Map<String, Node> getFailedNodeMap() {
        return this.failedNodeMap;
    }

    public DispatchQueue getDispatchQueue() {
        return this.dispatchQueue;
    }

    public ClearTask getClearTask() {
        return this.clearTask;
    }

    public int getNodeIndex() {
        return this.nodeIndex;
    }

    public HttpClient getHttpClient() {
        return this.httpClient;
    }

    public void setAgeiPort(AgeiPort ageiPort) {
        this.ageiPort = ageiPort;
    }

    public void setOptions(HttpDispatcherOptions httpDispatcherOptions) {
        this.options = httpDispatcherOptions;
    }

    public void setFailedNodeMap(Map<String, Node> map) {
        this.failedNodeMap = map;
    }

    public void setDispatchQueue(DispatchQueue dispatchQueue) {
        this.dispatchQueue = dispatchQueue;
    }

    public void setClearTask(ClearTask clearTask) {
        this.clearTask = clearTask;
    }

    public void setNodeIndex(int i) {
        this.nodeIndex = i;
    }

    public void setHttpClient(HttpClient httpClient) {
        this.httpClient = httpClient;
    }
}
