package com.touscm.quicker.mq.pulsar;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.PreDestroy;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/touscm/quicker/mq/pulsar/DefaultPulsarConsumer.class */
public class DefaultPulsarConsumer<T> implements IConsumer<T> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultPulsarConsumer.class);
    private final PulsarClient client;
    private Consumer<T> consumer;
    private int executorCount = 0;
    private ConsumeMode consumeMode;
    private Function<T, Boolean> receiver;
    private ExecutorService executorService;

    public DefaultPulsarConsumer(PulsarClient pulsarClient) {
        this.client = pulsarClient;
    }

    @Override // com.touscm.quicker.mq.pulsar.IConsumer
    public void init(@NotNull Class<T> cls, @NotBlank String str, SubscriptionType subscriptionType, @NotBlank String str2) {
        initConsumer(cls, str, subscriptionType, str2);
    }

    @Override // com.touscm.quicker.mq.pulsar.IConsumer
    public void start(@NotNull Function<T, Boolean> function) {
        start(function, ConsumeMode.Longtime, 1, 0L, 0L);
    }

    @Override // com.touscm.quicker.mq.pulsar.IConsumer
    public void start(@NotNull Function<T, Boolean> function, int i) {
        start(function, ConsumeMode.Longtime, i, 0L, 0L);
    }

    @Override // com.touscm.quicker.mq.pulsar.IConsumer
    public void start(@NotNull Function<T, Boolean> function, long j, long j2) {
        start(function, ConsumeMode.Scheduled, 0, j, j2);
    }

    @Override // com.touscm.quicker.mq.pulsar.IConsumer
    public void start(@NotNull Function<T, Boolean> function, ConsumeMode consumeMode, int i, long j, long j2) {
        if (this.consumer == null) {
            throw new RuntimeException("Please call init() method first");
        }
        this.receiver = function;
        setExecutorService(consumeMode, i);
        if (ConsumeMode.Scheduled != this.consumeMode) {
            for (int i2 = 0; i2 < this.executorCount; i2++) {
                this.executorService.execute(() -> {
                    while (true) {
                        receiveEntry();
                    }
                });
            }
            return;
        }
        if (j < 0) {
            j = 0;
        }
        if (j2 <= 0) {
            j2 = 1;
        }
        ((ScheduledExecutorService) this.executorService).scheduleAtFixedRate(this::receiveEntry, j, j2, TimeUnit.SECONDS);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    @PreDestroy
    public void close() throws IOException {
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    private void initConsumer(Class<T> cls, String str, SubscriptionType subscriptionType, String str2) {
        if (this.consumer != null) {
            logger.warn("consumer has initialize, entryType:{}, topic:{}, subscriptionType:{}, subscribe:{}", new Object[]{cls, str, subscriptionType, str2});
            return;
        }
        if (subscriptionType == null) {
            subscriptionType = SubscriptionType.Shared;
        }
        try {
            ConsumerBuilder consumerBuilder = this.client.newConsumer(DefaultImplementation.newJSONSchema(SchemaDefinition.builder().withPojo(cls).build())).topic(new String[]{str});
            if (StringUtils.isNotEmpty(str2)) {
                consumerBuilder.subscriptionName(str2);
            }
            this.consumer = consumerBuilder.subscriptionType(subscriptionType).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        } catch (PulsarClientException e) {
            logger.error("create consumer with exception, topic:{}, subscribe:{}", new Object[]{str, str2, e});
            throw new RuntimeException("create consumer with exception", e);
        }
    }

    private void setExecutorService(ConsumeMode consumeMode, int i) {
        this.consumeMode = consumeMode == null ? ConsumeMode.Longtime : consumeMode;
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        if (i <= 0) {
            this.executorCount = 1;
        } else {
            this.executorCount = Math.min(availableProcessors, i);
        }
        if (this.executorService != null) {
            logger.info("reset consumer executorService, shutdown old pool");
            this.executorService.shutdown();
        }
        logger.info("set consumer executorService, consumeMode:{}, executorCount:{}", this.consumeMode, Integer.valueOf(this.executorCount));
        this.executorService = this.consumeMode == ConsumeMode.Longtime ? Executors.newWorkStealingPool(this.executorCount) : Executors.newScheduledThreadPool(availableProcessors);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void receiveEntry() {
        if (!this.consumer.isConnected()) {
            logger.error("pulsar client already closed");
            return;
        }
        try {
            Message<T> receive = this.consumer.receive();
            Object value = receive.getValue();
            if (value == null) {
                logger.error("get message entry error, entry is null, messageKey:{}", receive.getKey());
                return;
            }
            boolean z = false;
            try {
                z = ((Boolean) this.receiver.apply(value)).booleanValue();
            } catch (Throwable th) {
                logger.error("process message entry with exception, messageKey:{}, entry:{}", new Object[]{receive.getKey(), value, th});
            }
            if (z) {
                acknowledgeConsume(receive);
            }
        } catch (Throwable th2) {
            logger.error("receive message with exception", th2);
        }
    }

    private void acknowledgeConsume(Message<T> message) {
        try {
            this.consumer.acknowledge(message.getMessageId());
        } catch (Throwable th) {
            try {
                this.consumer.negativeAcknowledge(message);
            } catch (Throwable th2) {
            }
        }
    }
}
