package com.transferwise.tasks.helpers.kafka;

import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.tasks.TasksProperties;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.admin.RackAwareMode$Disabled$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterable;
import scala.collection.Iterator;

/* loaded from: input_file:com/transferwise/tasks/helpers/kafka/TopicManager.class */
public class TopicManager implements ITopicManager {
    private CuratorFramework curatorFramework;
    private TasksProperties tasksProperties;
    private static final Logger log = LoggerFactory.getLogger(TopicManager.class);
    private static final RackAwareMode RACK_AWARE_MODE = new RackAwareMode$Disabled$();

    public TopicManager(CuratorFramework curatorFramework, TasksProperties tasksProperties) {
        this.curatorFramework = curatorFramework;
        this.tasksProperties = tasksProperties;
    }

    @Override // com.transferwise.tasks.helpers.kafka.ITopicManager
    public Collection<String> getAllTopics() {
        return (Collection) withZookeeper(zkUtils -> {
            return cast(zkUtils.getAllTopics());
        });
    }

    @Override // com.transferwise.tasks.helpers.kafka.ITopicManager
    public TopicInfo getTopicInfo(ZkUtils zkUtils, String str) {
        return new TopicInfo(str, (List) AdminUtils.fetchTopicMetadataFromZk(str, zkUtils).partitionMetadata().stream().map((v0) -> {
            return v0.partition();
        }).sorted().collect(Collectors.toList()));
    }

    @Override // com.transferwise.tasks.helpers.kafka.ITopicManager
    public void createTopic(ZkUtils zkUtils, String str, int i) {
        AdminUtils.createTopic(zkUtils, str, i, this.tasksProperties.getTopicReplicationFactor(), new Properties(), RACK_AWARE_MODE);
        log.info("Created Topic (name: {}, numPartitions: {}, replicationFactor: {})", new Object[]{str, Integer.valueOf(i), Integer.valueOf(this.tasksProperties.getTopicReplicationFactor())});
    }

    @Override // com.transferwise.tasks.helpers.kafka.ITopicManager
    public void deleteTopic(ZkUtils zkUtils, String str) {
        AdminUtils.deleteTopic(zkUtils, str);
        log.info("Deleted Topic (name: {})", str);
    }

    @Override // com.transferwise.tasks.helpers.kafka.ITopicManager
    public void addPartitions(ZkUtils zkUtils, String str, int i) {
        AdminUtils.addPartitions(zkUtils, str, i, "", true, RACK_AWARE_MODE);
        log.info("Altered topic (name: {}, numPartitions: {}, replicationFactor: {})", new Object[]{str, Integer.valueOf(i), Integer.valueOf(this.tasksProperties.getTopicReplicationFactor())});
    }

    @Override // com.transferwise.tasks.helpers.kafka.ITopicManager
    public void setPartitions(ZkUtils zkUtils, String str, int i) {
        TopicInfo topicInfo = getTopicInfo(zkUtils, str);
        if (topicInfo.getNumPartitions() == 0) {
            try {
                createTopic(zkUtils, str, i);
            } catch (TopicExistsException e) {
                setPartitions(zkUtils, str, i);
            }
        } else if (topicInfo.getNumPartitions() < i) {
            addPartitions(zkUtils, str, i);
        }
    }

    private <T> List<T> cast(Iterable<T> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        return arrayList;
    }

    private ZkUtils createZkUtils() {
        ZkConnection zkConnection = new ZkConnection(this.tasksProperties.getZookeeperConnectString(), 5000);
        return new ZkUtils(new ZkClient(zkConnection, 10000, ZKStringSerializer$.MODULE$), zkConnection, false);
    }

    private <T> T withZookeeper(Function<ZkUtils, T> function) {
        ZkUtils zkUtils = null;
        try {
            zkUtils = createZkUtils();
            T apply = function.apply(zkUtils);
            if (zkUtils != null) {
                try {
                    zkUtils.close();
                } catch (Exception e) {
                    log.warn("Failed to close ZkUtils", e);
                }
            }
            return apply;
        } catch (Throwable th) {
            if (zkUtils != null) {
                try {
                    zkUtils.close();
                } catch (Exception e2) {
                    log.warn("Failed to close ZkUtils", e2);
                }
            }
            throw th;
        }
    }

    @Override // com.transferwise.tasks.helpers.kafka.ITopicManager
    public <T> T callWithTopic(String str, Function<ZkUtils, T> function) {
        return (T) ExceptionUtils.doUnchecked(() -> {
            String str2 = "/tw/kafka/locks/" + str + "/admin";
            InterProcessMutex interProcessMutex = new InterProcessMutex(this.curatorFramework, str2);
            if (!interProcessMutex.acquire(10000L, TimeUnit.MILLISECONDS)) {
                throw new IllegalStateException("Could not take a lock for '" + str2 + ".");
            }
            try {
                Object withZookeeper = withZookeeper(function);
                interProcessMutex.release();
                return withZookeeper;
            } catch (Throwable th) {
                interProcessMutex.release();
                throw th;
            }
        });
    }

    @Override // com.transferwise.tasks.helpers.kafka.ITopicManager
    public void runWithTopic(String str, Consumer<ZkUtils> consumer) {
        callWithTopic(str, zkUtils -> {
            consumer.accept(zkUtils);
            return null;
        });
    }
}
