package org.apache.samza.storage;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/ChangelogPartitionManager.class */
public class ChangelogPartitionManager extends AbstractCoordinatorStreamManager {
    private static final Logger log = LoggerFactory.getLogger(ChangelogPartitionManager.class);
    private boolean isCoordinatorConsumerRegistered;

    public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer, CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer, String str) {
        super(coordinatorStreamSystemProducer, coordinatorStreamSystemConsumer, str);
        this.isCoordinatorConsumerRegistered = false;
    }

    @Override // org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager
    public void register(TaskName taskName) {
        log.debug("Adding taskName {} to {}", taskName, this);
        if (!this.isCoordinatorConsumerRegistered) {
            registerCoordinatorStreamConsumer();
            this.isCoordinatorConsumerRegistered = true;
        }
        registerCoordinatorStreamProducer(taskName.getTaskName());
    }

    public Map<TaskName, Integer> readChangeLogPartitionMapping() {
        log.debug("Reading changelog partition information");
        HashMap hashMap = new HashMap();
        Iterator<CoordinatorStreamMessage> it = getBootstrappedStream(SetChangelogMapping.TYPE).iterator();
        while (it.hasNext()) {
            SetChangelogMapping setChangelogMapping = new SetChangelogMapping(it.next());
            hashMap.put(new TaskName(setChangelogMapping.getTaskName()), Integer.valueOf(setChangelogMapping.getPartition()));
            log.debug("TaskName: {} is mapped to {}", setChangelogMapping.getTaskName(), Integer.valueOf(setChangelogMapping.getPartition()));
        }
        return hashMap;
    }

    public void writeChangeLogPartitionMapping(Map<TaskName, Integer> map) {
        log.debug("Updating changelog information with: ");
        for (Map.Entry<TaskName, Integer> entry : map.entrySet()) {
            log.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), entry.getValue());
            send(new SetChangelogMapping(getSource(), entry.getKey().getTaskName(), entry.getValue().intValue()));
        }
    }
}
