package org.killbill.billing.util.broadcast;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import org.killbill.billing.platform.api.LifecycleHandlerType;
import org.killbill.billing.util.broadcast.dao.BroadcastDao;
import org.killbill.billing.util.broadcast.dao.BroadcastModelDao;
import org.killbill.billing.util.config.definition.BroadcastConfig;
import org.killbill.bus.api.PersistentBus;
import org.killbill.commons.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/killbill-util-0.18.20.jar:org/killbill/billing/util/broadcast/DefaultBroadcastService.class */
public class DefaultBroadcastService implements BroadcastService {
    private static final int TERMINATION_TIMEOUT_SEC = 5;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DefaultBroadcastService.class);
    public static final String BROADCAST_SERVICE_NAME = "broadcast-service";
    private final BroadcastConfig broadcastConfig;
    private final BroadcastDao broadcastDao;
    private final PersistentBus eventBus;
    private AtomicLong latestRecordIdProcessed;
    private ScheduledExecutorService broadcastExecutor;
    private volatile boolean isStopped = false;

    /* loaded from: input_file:WEB-INF/lib/killbill-util-0.18.20.jar:org/killbill/billing/util/broadcast/DefaultBroadcastService$BroadcastServiceRunnable.class */
    private static class BroadcastServiceRunnable implements Runnable {
        private final DefaultBroadcastService parent;
        private final BroadcastDao broadcastDao;
        private final PersistentBus eventBus;

        public BroadcastServiceRunnable(DefaultBroadcastService defaultBroadcastService, BroadcastDao broadcastDao, PersistentBus persistentBus) {
            this.parent = defaultBroadcastService;
            this.broadcastDao = broadcastDao;
            this.eventBus = persistentBus;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.parent.isStopped) {
                return;
            }
            for (BroadcastModelDao broadcastModelDao : this.broadcastDao.getLatestEntriesFrom(Long.valueOf(this.parent.getLatestRecordIdProcessed().get()))) {
                if (this.parent.isStopped()) {
                    return;
                }
                DefaultBroadcastInternalEvent defaultBroadcastInternalEvent = new DefaultBroadcastInternalEvent(broadcastModelDao.getServiceName(), broadcastModelDao.getType(), broadcastModelDao.getEvent());
                try {
                    try {
                        this.eventBus.post(defaultBroadcastInternalEvent);
                        this.parent.setLatestRecordIdProcessed(broadcastModelDao.getRecordId());
                    } catch (PersistentBus.EventBusException e) {
                        DefaultBroadcastService.logger.warn("Failed to post event {}", defaultBroadcastInternalEvent, e);
                        this.parent.setLatestRecordIdProcessed(broadcastModelDao.getRecordId());
                    }
                } catch (Throwable th) {
                    this.parent.setLatestRecordIdProcessed(broadcastModelDao.getRecordId());
                    throw th;
                }
            }
        }
    }

    @Inject
    public DefaultBroadcastService(BroadcastDao broadcastDao, BroadcastConfig broadcastConfig, PersistentBus persistentBus) {
        this.broadcastDao = broadcastDao;
        this.broadcastConfig = broadcastConfig;
        this.eventBus = persistentBus;
    }

    @Override // org.killbill.billing.platform.api.KillbillService
    public String getName() {
        return BROADCAST_SERVICE_NAME;
    }

    @LifecycleHandlerType(LifecycleHandlerType.LifecycleLevel.INIT_SERVICE)
    public void initialize() {
        BroadcastModelDao latestEntry = this.broadcastDao.getLatestEntry();
        this.latestRecordIdProcessed = latestEntry != null ? new AtomicLong(latestEntry.getRecordId().longValue()) : new AtomicLong(0L);
        this.broadcastExecutor = Executors.newSingleThreadScheduledExecutor("BroadcastExecutor");
        this.isStopped = false;
    }

    @LifecycleHandlerType(LifecycleHandlerType.LifecycleLevel.START_SERVICE)
    public void start() {
        TimeUnit unit = this.broadcastConfig.getBroadcastServiceRunningRate().getUnit();
        long period = this.broadcastConfig.getBroadcastServiceRunningRate().getPeriod();
        this.broadcastExecutor.scheduleAtFixedRate(new BroadcastServiceRunnable(this, this.broadcastDao, this.eventBus), period, period, unit);
    }

    @LifecycleHandlerType(LifecycleHandlerType.LifecycleLevel.STOP_SERVICE)
    public void stop() {
        if (this.isStopped) {
            logger.warn("BroadcastExecutor is already in a stopped state");
            return;
        }
        try {
            this.broadcastExecutor.shutdown();
            if (!this.broadcastExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                logger.warn("BroadcastExecutor failed to complete termination within {} sec", (Object) 5);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("BroadcastExecutor stop sequence got interrupted");
        } finally {
            this.isStopped = true;
        }
    }

    public boolean isStopped() {
        return this.isStopped;
    }

    public AtomicLong getLatestRecordIdProcessed() {
        return this.latestRecordIdProcessed;
    }

    public void setLatestRecordIdProcessed(Long l) {
        this.latestRecordIdProcessed.set(l.longValue());
    }
}
