package org.killbill.queue;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.killbill.commons.concurrent.Executors;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.QueueLifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/killbill-queue-0.20.17.jar:org/killbill/queue/DefaultQueueLifecycle.class */
public abstract class DefaultQueueLifecycle implements QueueLifecycle {
    public static final String QUEUE_NAME = "Queue";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DefaultQueueLifecycle.class);
    private static final long ONE_MILLION = 1000000;
    protected final String svcQName;
    protected final ObjectMapper objectMapper;
    protected final PersistentQueueConfig config;
    private volatile boolean isProcessingEvents;
    private ExecutorService executor;

    public DefaultQueueLifecycle(String str, PersistentQueueConfig persistentQueueConfig) {
        this(str, persistentQueueConfig, QueueObjectMapper.get());
    }

    private DefaultQueueLifecycle(String str, PersistentQueueConfig persistentQueueConfig, ObjectMapper objectMapper) {
        this.svcQName = str;
        this.config = persistentQueueConfig;
        this.isProcessingEvents = false;
        this.objectMapper = objectMapper;
    }

    @Override // org.killbill.queue.api.QueueLifecycle
    public boolean startQueue() {
        this.executor = Executors.newFixedThreadPool(1, this.config.getTableName() + "-lifecycle-th");
        this.isProcessingEvents = true;
        log.info(String.format("%s: Starting...", this.svcQName));
        this.executor.execute(new Runnable() { // from class: org.killbill.queue.DefaultQueueLifecycle.1
            @Override // java.lang.Runnable
            public void run() {
                DefaultQueueLifecycle.log.info(String.format("%s: Thread %s [%d] starting", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())));
                while (DefaultQueueLifecycle.this.isProcessingEvents) {
                    try {
                        try {
                            long nanoTime = System.nanoTime();
                            try {
                                try {
                                    DefaultQueueLifecycle.this.doProcessEvents();
                                    sleepALittle((System.nanoTime() - nanoTime) / DefaultQueueLifecycle.ONE_MILLION);
                                } catch (Exception e) {
                                    DefaultQueueLifecycle.log.warn(String.format("%s: Thread  %s  [%d] got an exception, catching and moving on...", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())), (Throwable) e);
                                    sleepALittle((System.nanoTime() - nanoTime) / DefaultQueueLifecycle.ONE_MILLION);
                                }
                            } catch (Throwable th) {
                                sleepALittle((System.nanoTime() - nanoTime) / DefaultQueueLifecycle.ONE_MILLION);
                                throw th;
                            }
                        } catch (Throwable th2) {
                            DefaultQueueLifecycle.log.info(String.format("%s: Thread %s has exited", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
                            throw th2;
                        }
                    } catch (InterruptedException e2) {
                        DefaultQueueLifecycle.log.info(String.format("%s: Thread %s got interrupted, exting... ", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
                        DefaultQueueLifecycle.log.info(String.format("%s: Thread %s has exited", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
                        return;
                    } catch (Throwable th3) {
                        DefaultQueueLifecycle.log.error(String.format("%s: Thread %s got an exception, exting... ", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()), th3);
                        DefaultQueueLifecycle.log.info(String.format("%s: Thread %s has exited", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
                        return;
                    }
                }
                DefaultQueueLifecycle.log.info(String.format("%s: Thread %s has exited", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
            }

            private void sleepALittle(long j) throws InterruptedException {
                if (DefaultQueueLifecycle.this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS) {
                    return;
                }
                long pollingSleepTimeMs = DefaultQueueLifecycle.this.config.getPollingSleepTimeMs() - j;
                if (pollingSleepTimeMs > 0) {
                    Thread.sleep(pollingSleepTimeMs);
                }
            }
        });
        return true;
    }

    @Override // org.killbill.queue.api.QueueLifecycle
    public void stopQueue() {
        this.isProcessingEvents = false;
        this.executor.shutdownNow();
        try {
            this.executor.awaitTermination(this.config.getPollingSleepTimeMs(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info(String.format("%s: Stop sequence has been interrupted", this.svcQName));
        }
    }

    public abstract int doProcessEvents();

    public ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }
}
