package biz.paluch.logging.gelf.intern.sender;

import biz.paluch.logging.gelf.intern.GelfSender;
import biz.paluch.logging.gelf.intern.GelfSenderConfiguration;
import biz.paluch.logging.gelf.intern.GelfSenderProvider;
import java.net.URI;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

/* loaded from: input_file:biz/paluch/logging/gelf/intern/sender/KafkaGelfSenderProvider.class */
public class KafkaGelfSenderProvider implements GelfSenderProvider {
    private static final int BROKER_DEFAULT_PORT = 9092;

    @Override // biz.paluch.logging.gelf.intern.GelfSenderProvider
    public boolean supports(String str) {
        return str != null && str.startsWith("kafka:");
    }

    @Override // biz.paluch.logging.gelf.intern.GelfSenderProvider
    public GelfSender create(GelfSenderConfiguration gelfSenderConfiguration) {
        URI create = URI.create(gelfSenderConfiguration.getHost());
        Map<String, String> parse = QueryStringParser.parse(create);
        Properties properties = new Properties();
        for (String str : parse.keySet()) {
            properties.setProperty(str, parse.get(str));
        }
        properties.setProperty("bootstrap.servers", getBrokerServers(gelfSenderConfiguration));
        String topic = getTopic(create);
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        if (properties.containsKey("acks")) {
            String property = properties.getProperty("acks");
            properties.put("acks", "0".equalsIgnoreCase(property) ? "1" : property);
        } else {
            properties.put("acks", "all");
        }
        if (!properties.containsKey("retries")) {
            properties.put("retries", 2);
        }
        return new KafkaGelfSender(new KafkaProducer(properties), topic, gelfSenderConfiguration.getErrorReporter());
    }

    private static String getBrokerServers(GelfSenderConfiguration gelfSenderConfiguration) {
        String authority;
        URI create = URI.create(gelfSenderConfiguration.getHost());
        if (create.getHost() != null) {
            authority = create.getHost() + ":" + (create.getPort() > 0 ? create.getPort() : gelfSenderConfiguration.getPort() > 0 ? gelfSenderConfiguration.getPort() : BROKER_DEFAULT_PORT);
        } else {
            authority = create.getAuthority();
        }
        if (authority == null || authority.isEmpty()) {
            throw new IllegalArgumentException("Kafka URI must specify bootstrap.servers.");
        }
        return authority;
    }

    private static String getTopic(URI uri) {
        String fragment = uri.getFragment();
        if (fragment == null || fragment.isEmpty()) {
            throw new IllegalArgumentException("Kafka URI must specify log topic as fragment.");
        }
        return fragment;
    }
}
