package kinesis4cats.kcl.fs2;

import cats.Parallel;
import cats.UnorderedFoldable$;
import cats.effect.kernel.Async;
import cats.effect.kernel.Resource;
import cats.effect.kernel.syntax.EffectResourceOps$;
import cats.effect.std.Queue;
import cats.syntax.IfMOps$;
import cats.syntax.ParallelTraversable_Ops$;
import cats.syntax.package$all$;
import fs2.Stream;
import fs2.Stream$;
import fs2.concurrent.SignallingRef$;
import java.util.UUID;
import kinesis4cats.compat.retry.RetryPolicies$;
import kinesis4cats.compat.retry.Sleep$;
import kinesis4cats.compat.retry.package$;
import kinesis4cats.kcl.CommittableRecord;
import kinesis4cats.kcl.CommittableRecord$;
import kinesis4cats.kcl.KCLConsumer;
import kinesis4cats.kcl.KCLConsumer$;
import kinesis4cats.kcl.KCLConsumer$ProcessConfig$;
import kinesis4cats.kcl.RecordProcessor;
import kinesis4cats.kcl.RecordProcessor$Config$;
import kinesis4cats.kcl.fs2.KCLConsumerFS2;
import kinesis4cats.kcl.multistream.MultiStreamTracker;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.collection.immutable.List;
import scala.runtime.BoxesRunTime;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.retrieval.RetrievalConfig;

/* compiled from: KCLConsumerFS2.scala */
/* loaded from: input_file:kinesis4cats/kcl/fs2/KCLConsumerFS2$.class */
public final class KCLConsumerFS2$ {
    public static KCLConsumerFS2$ MODULE$;
    private final KCLConsumer.ProcessConfig defaultProcessConfig;

    static {
        new KCLConsumerFS2$();
    }

    public KCLConsumer.ProcessConfig defaultProcessConfig() {
        return this.defaultProcessConfig;
    }

    public <F> Function1<List<CommittableRecord<F>>, F> callback(Queue<F, CommittableRecord<F>> queue, Async<F> async) {
        return list -> {
            return package$all$.MODULE$.toFoldableOps(list, UnorderedFoldable$.MODULE$.catsTraverseForList()).traverse_(committableRecord -> {
                return queue.offer(committableRecord);
            }, async);
        };
    }

    public <F> Resource<F, KCLConsumerFS2<F>> apply(CheckpointConfig checkpointConfig, CoordinatorConfig coordinatorConfig, LeaseManagementConfig leaseManagementConfig, LifecycleConfig lifecycleConfig, MetricsConfig metricsConfig, RetrievalConfig retrievalConfig, KCLConsumerFS2.FS2Config fS2Config, KCLConsumer.ProcessConfig processConfig, Async<F> async, Parallel<F> parallel, RecordProcessor.LogEncoders logEncoders) {
        return KCLConsumerFS2$Config$.MODULE$.create(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, retrievalConfig, fS2Config, processConfig, async, logEncoders).map(config -> {
            return new KCLConsumerFS2(config, async, parallel);
        });
    }

    public <F> KCLConsumerFS2.FS2Config apply$default$7() {
        return KCLConsumerFS2$FS2Config$.MODULE$.m3default();
    }

    public <F> KCLConsumer.ProcessConfig apply$default$8() {
        return defaultProcessConfig();
    }

    public <F> Resource<F, KCLConsumerFS2<F>> configsBuilder(KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, CloudWatchAsyncClient cloudWatchAsyncClient, String str, String str2, KCLConsumerFS2.FS2Config fS2Config, String str3, KCLConsumer.ProcessConfig processConfig, Function1<KCLConsumer.Config<F>, KCLConsumer.Config<F>> function1, Async<F> async, Parallel<F> parallel, RecordProcessor.LogEncoders logEncoders) {
        return KCLConsumerFS2$Config$.MODULE$.configsBuilder(kinesisAsyncClient, dynamoDbAsyncClient, cloudWatchAsyncClient, str, str2, fS2Config, str3, processConfig, function1, async, logEncoders).map(config -> {
            return new KCLConsumerFS2(config, async, parallel);
        });
    }

    public <F> KCLConsumerFS2.FS2Config configsBuilder$default$6() {
        return KCLConsumerFS2$FS2Config$.MODULE$.m3default();
    }

    public <F> String configsBuilder$default$7() {
        return UUID.randomUUID().toString();
    }

    public <F> KCLConsumer.ProcessConfig configsBuilder$default$8() {
        return defaultProcessConfig();
    }

    public <F> Function1<KCLConsumer.Config<F>, KCLConsumer.Config<F>> configsBuilder$default$9(KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, CloudWatchAsyncClient cloudWatchAsyncClient, String str, String str2, KCLConsumerFS2.FS2Config fS2Config, String str3, KCLConsumer.ProcessConfig processConfig) {
        return config -> {
            return config;
        };
    }

    public <F> Resource<F, KCLConsumerFS2<F>> configsBuilderMultiStream(KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, CloudWatchAsyncClient cloudWatchAsyncClient, MultiStreamTracker multiStreamTracker, String str, KCLConsumerFS2.FS2Config fS2Config, String str2, KCLConsumer.ProcessConfig processConfig, Function1<KCLConsumer.Config<F>, KCLConsumer.Config<F>> function1, Async<F> async, Parallel<F> parallel, RecordProcessor.LogEncoders logEncoders) {
        return KCLConsumerFS2$Config$.MODULE$.configsBuilderMultiStream(kinesisAsyncClient, dynamoDbAsyncClient, cloudWatchAsyncClient, multiStreamTracker, str, fS2Config, str2, processConfig, function1, async, logEncoders).map(config -> {
            return new KCLConsumerFS2(config, async, parallel);
        });
    }

    public <F> KCLConsumerFS2.FS2Config configsBuilderMultiStream$default$6() {
        return KCLConsumerFS2$FS2Config$.MODULE$.m3default();
    }

    public <F> String configsBuilderMultiStream$default$7() {
        return UUID.randomUUID().toString();
    }

    public <F> KCLConsumer.ProcessConfig configsBuilderMultiStream$default$8() {
        return defaultProcessConfig();
    }

    public <F> Function1<KCLConsumer.Config<F>, KCLConsumer.Config<F>> configsBuilderMultiStream$default$9(KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, CloudWatchAsyncClient cloudWatchAsyncClient, MultiStreamTracker multiStreamTracker, String str, KCLConsumerFS2.FS2Config fS2Config, String str2, KCLConsumer.ProcessConfig processConfig) {
        return config -> {
            return config;
        };
    }

    public <F> Resource<F, Stream<F, CommittableRecord<F>>> stream(KCLConsumerFS2.Config<F> config, Async<F> async) {
        return EffectResourceOps$.MODULE$.toResource$extension(cats.effect.syntax.package$all$.MODULE$.effectResourceOps(SignallingRef$.MODULE$.apply(BoxesRunTime.boxToBoolean(false), async))).flatMap(signallingRef -> {
            return KCLConsumer$.MODULE$.run(config.underlying(), async).onFinalize(signallingRef.set(BoxesRunTime.boxToBoolean(true)), async).map(boxedUnit -> {
                return Stream$.MODULE$.fromQueueUnterminated(config.queue(), Stream$.MODULE$.fromQueueUnterminated$default$2(), async).interruptWhen(signallingRef, async);
            });
        });
    }

    public <F> Function1<Stream<F, CommittableRecord<F>>, Stream<F, CommittableRecord<F>>> commitRecords(KCLConsumerFS2.Config<F> config, Async<F> async, Parallel<F> parallel) {
        return stream -> {
            return stream.groupWithin(config.fs2Config().commitMaxChunk(), config.fs2Config().commitMaxWait(), async).evalTap(chunk -> {
                return ParallelTraversable_Ops$.MODULE$.parTraverse_$extension(package$all$.MODULE$.catsSyntaxParallelTraverse_(chunk.toList().groupBy(committableRecord -> {
                    return committableRecord.shardId();
                }).toList(), UnorderedFoldable$.MODULE$.catsTraverseForList()), tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    CommittableRecord committableRecord2 = (CommittableRecord) ((List) tuple2._2()).max(CommittableRecord$.MODULE$.orderBySequenceNumber());
                    return IfMOps$.MODULE$.ifM$extension(package$all$.MODULE$.catsSyntaxIfM(committableRecord2.canCheckpoint(), async), () -> {
                        return package$.MODULE$.retryingOnAllErrors().apply(RetryPolicies$.MODULE$.limitRetries(config.fs2Config().commitMaxRetries(), async).join(RetryPolicies$.MODULE$.constantDelay(config.fs2Config().commitRetryInterval(), async), async), package$.MODULE$.noop(async), () -> {
                            return committableRecord2.checkpoint();
                        }, async, Sleep$.MODULE$.sleepUsingTemporal(async));
                    }, () -> {
                        return async.unit();
                    }, async);
                }, UnorderedFoldable$.MODULE$.catsTraverseForList(), parallel);
            }, async).unchunks(Predef$.MODULE$.$conforms());
        };
    }

    private KCLConsumerFS2$() {
        MODULE$ = this;
        RecordProcessor.Config copy = RecordProcessor$Config$.MODULE$.default().copy(RecordProcessor$Config$.MODULE$.default().copy$default$1(), RecordProcessor$Config$.MODULE$.default().copy$default$2(), RecordProcessor$Config$.MODULE$.default().copy$default$3(), false);
        this.defaultProcessConfig = KCLConsumer$ProcessConfig$.MODULE$.default().copy(KCLConsumer$ProcessConfig$.MODULE$.default().copy$default$1(), copy, KCLConsumer$ProcessConfig$.MODULE$.default().copy$default$3());
    }
}
