package org.geneweaver.variant.orthology.transaction;

import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.geneweaver.domain.Entity;
import org.geneweaver.io.TimeInfo;
import org.geneweaver.io.connector.Connector;
import org.geneweaver.io.reader.StreamReader;
import org.geneweaver.variant.orthology.imp.AbstractImportManager;
import org.neo4j.ogm.session.Session;
import org.neo4j.ogm.session.SessionFactory;

/* loaded from: input_file:org/geneweaver/variant/orthology/transaction/ParallelImportManager.class */
public class ParallelImportManager<T extends Entity> extends AbstractImportManager<T> {
    private int parallelization;
    private SessionFactory sessionFactory;
    private ThreadFactory factory;
    private Semaphore semaphore;

    public ParallelImportManager(SessionFactory sessionFactory) {
        this(sessionFactory, null, Runtime.getRuntime().availableProcessors());
    }

    public ParallelImportManager(SessionFactory sessionFactory, Connector<T, Entity> connector) {
        this(sessionFactory, connector, Runtime.getRuntime().availableProcessors());
    }

    public ParallelImportManager(SessionFactory sessionFactory, Connector<T, Entity> connector, int i) {
        super(connector);
        this.sessionFactory = sessionFactory;
        this.connector = connector;
        i = i < 1 ? Runtime.getRuntime().availableProcessors() : i;
        this.parallelization = i;
        this.factory = Executors.defaultThreadFactory();
        this.semaphore = new Semaphore(i);
    }

    @Override // org.geneweaver.variant.orthology.imp.AbstractImportManager
    public long saveAll(StreamReader<T> streamReader, Consumer<TimeInfo> consumer) throws InterruptedException {
        return saveChunks(streamReader, () -> {
            saveChunk(this.semaphore, streamReader, consumer);
        });
    }

    @Override // org.geneweaver.variant.orthology.imp.AbstractImportManager
    protected void save(Object obj, Collection<T> collection, Consumer<TimeInfo> consumer) {
        Semaphore semaphore = (Semaphore) obj;
        TimeInfo timeInfo = consumer != null ? new TimeInfo() : null;
        Session dummySession = isDryRun() ? new DummySession() : this.sessionFactory.openSession();
        ChunkedTransactionManager chunkedTransactionManager = new ChunkedTransactionManager(dummySession);
        try {
            Collection<T> collection2 = this.connector == null ? collection : (Collection) collection.stream().flatMap(entity -> {
                return this.connector.stream(entity, dummySession);
            }).filter(entity2 -> {
                return entity2 != null;
            }).collect(Collectors.toList());
            save(chunkedTransactionManager, dummySession, collection2);
            if (timeInfo != null) {
                timeInfo.setCount(collection2.size());
            }
            this.count += collection2.size();
            if (chunkedTransactionManager != null) {
                chunkedTransactionManager.close();
            }
            semaphore.release();
            if (timeInfo != null) {
                timeInfo.close();
            }
            if (consumer != null) {
                consumer.accept(timeInfo);
            }
        } catch (Throwable th) {
            if (chunkedTransactionManager != null) {
                chunkedTransactionManager.close();
            }
            semaphore.release();
            if (timeInfo != null) {
                timeInfo.close();
            }
            throw th;
        }
    }
}
