package io.github.embeddedkafka.ops;

import io.github.embeddedkafka.EmbeddedKafkaConfig;
import io.github.embeddedkafka.KafkaUnavailableException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Try$;

/* compiled from: ProducerOps.scala */
/* loaded from: input_file:io/github/embeddedkafka/ops/ProducerOps.class */
public interface ProducerOps<C extends EmbeddedKafkaConfig> {
    static void $init$(ProducerOps producerOps) {
        producerOps.io$github$embeddedkafka$ops$ProducerOps$_setter_$producerPublishTimeout_$eq(new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds());
    }

    FiniteDuration producerPublishTimeout();

    void io$github$embeddedkafka$ops$ProducerOps$_setter_$producerPublishTimeout_$eq(FiniteDuration finiteDuration);

    Map<String, Object> baseProducerConfig(C c);

    default Map<String, Object> defaultProducerConf(C c) {
        return (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), new StringBuilder(10).append("localhost:").append(c.kafkaPort()).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("max.block.ms"), BoxesRunTime.boxToInteger(10000).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("retry.backoff.ms"), BoxesRunTime.boxToInteger(1000).toString())}));
    }

    default void publishStringMessageToKafka(String str, String str2, C c) {
        publishToKafka(str, str2, c, new StringSerializer());
    }

    default <T> void publishToKafka(String str, T t, C c, Serializer<T> serializer) throws KafkaUnavailableException {
        publishToKafka(new KafkaProducer<>(CollectionConverters$.MODULE$.MapHasAsJava(baseProducerConfig(c)).asJava(), new StringSerializer(), serializer), new ProducerRecord<>(str, t));
    }

    /* JADX WARN: Multi-variable type inference failed */
    default <T> void publishToKafka(ProducerRecord<String, T> producerRecord, C c, Serializer<T> serializer) throws KafkaUnavailableException {
        publishToKafka(new KafkaProducer<>(CollectionConverters$.MODULE$.MapHasAsJava(baseProducerConfig(c)).asJava(), new StringSerializer(), serializer), producerRecord);
    }

    default <K, T> void publishToKafka(String str, K k, T t, C c, Serializer<K> serializer, Serializer<T> serializer2) throws KafkaUnavailableException {
        publishToKafka(new KafkaProducer<>(CollectionConverters$.MODULE$.MapHasAsJava(baseProducerConfig(c)).asJava(), serializer, serializer2), new ProducerRecord<>(str, k, t));
    }

    default <K, T> void publishToKafka(String str, Seq<Tuple2<K, T>> seq, C c, Serializer<K> serializer, Serializer<T> serializer2) throws KafkaUnavailableException {
        KafkaProducer kafkaProducer = new KafkaProducer(CollectionConverters$.MODULE$.MapHasAsJava(baseProducerConfig(c)).asJava(), serializer, serializer2);
        Function2 function2 = (obj, obj2) -> {
            return new ProducerRecord(str, obj, obj2);
        };
        Seq seq2 = (Seq) ((Seq) seq.map(function2.tupled().andThen(producerRecord -> {
            return kafkaProducer.send(producerRecord);
        }))).map(future -> {
            return Try$.MODULE$.apply(() -> {
                return r1.$anonfun$3$$anonfun$1(r2);
            });
        });
        kafkaProducer.close();
        seq2.collectFirst(new ProducerOps$$anon$1());
    }

    default <K, V, T> T withProducer(Function1<KafkaProducer<K, V>, T> function1, C c, Serializer<K> serializer, Serializer<V> serializer2) {
        return (T) io.github.embeddedkafka.package$.MODULE$.loanAndClose(new KafkaProducer(CollectionConverters$.MODULE$.MapHasAsJava(baseProducerConfig(c)).asJava(), serializer, serializer2), function1);
    }

    private default <K, T> void publishToKafka(KafkaProducer<K, T> kafkaProducer, ProducerRecord<K, T> producerRecord) {
        Future send = kafkaProducer.send(producerRecord);
        Failure apply = Try$.MODULE$.apply(() -> {
            return r1.$anonfun$4(r2);
        });
        kafkaProducer.close();
        if (apply instanceof Failure) {
            throw new KafkaUnavailableException(apply.exception());
        }
    }

    private default RecordMetadata $anonfun$3$$anonfun$1(Future future) {
        return (RecordMetadata) future.get(producerPublishTimeout().length(), producerPublishTimeout().unit());
    }

    private default RecordMetadata $anonfun$4(Future future) {
        return (RecordMetadata) future.get(producerPublishTimeout().length(), producerPublishTimeout().unit());
    }
}
