package org.killbill.queue.dispatching;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.joda.time.DateTime;
import org.killbill.clock.Clock;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.api.QueueEvent;
import org.killbill.queue.dao.EventEntryModelDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/killbill-queue-0.20.17.jar:org/killbill/queue/dispatching/CallableCallbackBase.class */
public abstract class CallableCallbackBase<E extends QueueEvent, M extends EventEntryModelDao> implements CallableCallback<E, M> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CallableCallbackBase.class);
    private final DBBackedQueue<M> dao;
    private final Clock clock;
    private final PersistentQueueConfig config;
    private final ObjectMapper objectMapper;

    public CallableCallbackBase(DBBackedQueue<M> dBBackedQueue, Clock clock, PersistentQueueConfig persistentQueueConfig, ObjectMapper objectMapper) {
        this.dao = dBBackedQueue;
        this.clock = clock;
        this.config = persistentQueueConfig;
        this.objectMapper = objectMapper;
    }

    @Override // org.killbill.queue.dispatching.CallableCallback
    public E deserialize(M m) {
        return (E) deserializeEvent(m, this.objectMapper);
    }

    public static <E extends QueueEvent, M extends EventEntryModelDao> E deserializeEvent(M m, ObjectMapper objectMapper) {
        try {
            return (E) objectMapper.readValue(m.getEventJson(), Class.forName(m.getClassName()));
        } catch (Exception e) {
            log.error(String.format("Failed to deserialize json object %s for class %s", m.getEventJson(), m.getClassName()), (Throwable) e);
            return null;
        }
    }

    @Override // org.killbill.queue.dispatching.CallableCallback
    public abstract void dispatch(E e, M m) throws Exception;

    public abstract M buildEntry(M m, DateTime dateTime, PersistentQueueEntryLifecycleState persistentQueueEntryLifecycleState, long j);

    @Override // org.killbill.queue.dispatching.CallableCallback
    public void updateErrorCountOrMoveToHistory(E e, M m, long j, Throwable th) {
        if (th == null) {
            moveSuccessfulEventToHistory(m);
            if (log.isDebugEnabled()) {
                log.debug("Done handling notification {}, key = {}", m.getRecordId(), m.getEventJson());
                return;
            }
            return;
        }
        if (j <= this.config.getMaxFailureRetries()) {
            log.info("Dispatch error, will attempt a retry ", th);
            updateRetryCountForFailedEvent(m, j);
        } else {
            log.error("Fatal NotificationQ dispatch error, data corruption...", th);
            moveFailedEventToHistory(m);
        }
    }

    private void moveSuccessfulEventToHistory(M m) {
        this.dao.moveEntryToHistory(buildEntry(m, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED, m.getErrorCount().longValue()));
    }

    private void updateRetryCountForFailedEvent(M m, long j) {
        this.dao.updateOnError(buildEntry(m, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.AVAILABLE, j));
    }

    private void moveFailedEventToHistory(M m) {
        this.dao.moveEntryToHistory(buildEntry(m, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED, m.getErrorCount().longValue()));
    }
}
