package com.alibaba.ageiport.processor.core.eventbus.http;

import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.common.utils.StringUtils;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.dispatcher.http.HttpDispatchResponse;
import com.alibaba.ageiport.processor.core.dispatcher.http.HttpDispatcher;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.eventbus.EventBus;
import com.alibaba.ageiport.processor.core.spi.task.monitor.TaskStageEvent;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import java.util.EventListener;
import java.util.EventObject;

/* loaded from: input_file:com/alibaba/ageiport/processor/core/eventbus/http/HttpEventBus.class */
public class HttpEventBus implements EventBus {
    public static final String URL = "/event";
    private static final Logger logger = LoggerFactory.getLogger(HttpDispatcher.class);
    private AgeiPort ageiPort;
    HttpClient httpClient;
    HttpEventBusAgent agent;
    HttpEventBusOptions options;

    public HttpEventBus(AgeiPort ageiPort, HttpEventBusOptions httpEventBusOptions) {
        this.ageiPort = ageiPort;
        this.options = httpEventBusOptions;
        this.agent = new HttpEventBusAgent(ageiPort, httpEventBusOptions);
        Vertx vertx = (Vertx) ageiPort.getBean(Vertx.class, ageiPort2 -> {
            return Vertx.vertx();
        }, ageiPort);
        vertx.deployVerticle(this.agent);
        this.httpClient = vertx.createHttpClient();
    }

    @Override // com.alibaba.ageiport.processor.core.spi.eventbus.EventBus
    public void register(EventListener eventListener) {
        this.agent.register(eventListener);
    }

    @Override // com.alibaba.ageiport.processor.core.spi.eventbus.EventBus
    public void unregister(EventListener eventListener) {
        this.agent.unregister(eventListener);
    }

    @Override // com.alibaba.ageiport.processor.core.spi.eventbus.EventBus
    public void post(EventObject eventObject) {
        TaskStageEvent taskStageEvent = (TaskStageEvent) eventObject;
        MainTask mainTask = this.ageiPort.getTaskServerClient().getMainTask(taskStageEvent.getMainTaskId());
        RequestOptions requestOptions = new RequestOptions();
        String host = mainTask.getHost();
        requestOptions.setHost(host).setPort(this.options.getPort()).setMethod(HttpMethod.POST).setURI(URL).setTimeout(3000L);
        String format = StringUtils.format("main:{}, sub:{}, ip:{}, stage:{}", new Object[]{taskStageEvent.getMainTaskId(), taskStageEvent.getSubTaskId(), host, taskStageEvent.getStage()});
        this.httpClient.request(requestOptions, asyncResult -> {
            if (!asyncResult.succeeded()) {
                logger.error("post request failed, {}", new Object[]{format});
                return;
            }
            HttpClientRequest httpClientRequest = (HttpClientRequest) asyncResult.result();
            String jsonString = JsonUtil.toJsonString(eventObject);
            logger.info("send:{}", new Object[]{jsonString});
            httpClientRequest.send(jsonString, asyncResult -> {
                if (asyncResult.succeeded() && ((HttpClientResponse) asyncResult.result()).statusCode() == 200) {
                    ((HttpClientResponse) asyncResult.result()).bodyHandler(buffer -> {
                        String buffer = buffer.toString();
                        HttpDispatchResponse httpDispatchResponse = (HttpDispatchResponse) JsonUtil.toObject(buffer, HttpDispatchResponse.class);
                        if (httpDispatchResponse == null || !Boolean.TRUE.equals(httpDispatchResponse.getSuccess())) {
                            logger.error("post event failed, message:{}, resultJson:{}", new Object[]{format, buffer});
                        } else {
                            logger.debug("post event success, {}", new Object[]{format});
                        }
                    });
                } else {
                    logger.error("post response failed, {}", new Object[]{format});
                }
            });
        });
    }
}
