package org.apache.samza.coordinator;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/samza/coordinator/StreamPartitionCountMonitor.class */
public class StreamPartitionCountMonitor {
    private final Set<SystemStream> streamsToMonitor;
    private final StreamMetadataCache metadataCache;
    private final MetricsRegistryMap metrics;
    private final int monitorPeriodMs;
    private final Map<SystemStream, Gauge<Integer>> gauges;
    private final Map<SystemStream, SystemStreamMetadata> initialMetadata;
    private final Object lock = new Object();
    private final ScheduledExecutorService schedulerService = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
    private volatile State state = State.INIT;
    private static final Logger log = LoggerFactory.getLogger(StreamPartitionCountMonitor.class);
    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.samza.coordinator.StreamPartitionCountMonitor$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/samza/coordinator/StreamPartitionCountMonitor$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$samza$coordinator$StreamPartitionCountMonitor$State = new int[State.values().length];

        static {
            try {
                $SwitchMap$org$apache$samza$coordinator$StreamPartitionCountMonitor$State[State.INIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$samza$coordinator$StreamPartitionCountMonitor$State[State.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$samza$coordinator$StreamPartitionCountMonitor$State[State.STOPPED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/coordinator/StreamPartitionCountMonitor$State.class */
    public enum State {
        INIT,
        RUNNING,
        STOPPED
    }

    /* loaded from: input_file:org/apache/samza/coordinator/StreamPartitionCountMonitor$ThreadFactoryImpl.class */
    private static class ThreadFactoryImpl implements ThreadFactory {
        private static final String PREFIX = "Samza-" + StreamPartitionCountMonitor.class.getSimpleName() + "-";
        private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();

        private ThreadFactoryImpl() {
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
        }
    }

    private static Map<SystemStream, SystemStreamMetadata> getMetadata(Set<SystemStream> set, StreamMetadataCache streamMetadataCache) {
        return JavaConversions.mapAsJavaMap(streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(set).toSet(), true));
    }

    public StreamPartitionCountMonitor(Set<SystemStream> set, StreamMetadataCache streamMetadataCache, MetricsRegistryMap metricsRegistryMap, int i) {
        this.streamsToMonitor = set;
        this.metadataCache = streamMetadataCache;
        this.metrics = metricsRegistryMap;
        this.monitorPeriodMs = i;
        this.initialMetadata = getMetadata(set, streamMetadataCache);
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<SystemStream, SystemStreamMetadata>> it = this.initialMetadata.entrySet().iterator();
        while (it.hasNext()) {
            SystemStream key = it.next().getKey();
            hashMap.put(key, metricsRegistryMap.newGauge("job-coordinator", String.format("%s-%s-partitionCount", key.getSystem(), key.getStream()), 0));
        }
        this.gauges = Collections.unmodifiableMap(hashMap);
    }

    void updatePartitionCountMetric() {
        try {
            Map<SystemStream, SystemStreamMetadata> metadata = getMetadata(this.streamsToMonitor, this.metadataCache);
            for (Map.Entry<SystemStream, SystemStreamMetadata> entry : this.initialMetadata.entrySet()) {
                SystemStream key = entry.getKey();
                SystemStreamMetadata value = entry.getValue();
                this.gauges.get(key).set(Integer.valueOf(metadata.get(key).getSystemStreamPartitionMetadata().keySet().size() - value.getSystemStreamPartitionMetadata().keySet().size()));
            }
        } catch (Exception e) {
            log.error("Exception while updating partition count metric.", e);
        }
    }

    Map<SystemStream, Gauge<Integer>> getGauges() {
        return this.gauges;
    }

    public void start() {
        synchronized (this.lock) {
            switch (AnonymousClass2.$SwitchMap$org$apache$samza$coordinator$StreamPartitionCountMonitor$State[this.state.ordinal()]) {
                case 1:
                    this.schedulerService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.samza.coordinator.StreamPartitionCountMonitor.1
                        @Override // java.lang.Runnable
                        public void run() {
                            StreamPartitionCountMonitor.this.updatePartitionCountMetric();
                        }
                    }, this.monitorPeriodMs, this.monitorPeriodMs, TimeUnit.MILLISECONDS);
                    this.state = State.RUNNING;
                    break;
                case CoordinatorStreamMessage.KEY_INDEX /* 2 */:
                    return;
                case 3:
                    throw new IllegalStateException("StreamPartitionCountMonitor was stopped and cannot be restarted.");
            }
        }
    }

    public void stop() {
        synchronized (this.lock) {
            this.schedulerService.shutdownNow();
            this.state = State.STOPPED;
        }
    }

    boolean isRunning() {
        return this.state == State.RUNNING;
    }

    boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.schedulerService.awaitTermination(j, timeUnit);
    }
}
