package io.advantageous.qbit.jms;

import io.advantageous.qbit.service.Startable;
import io.advantageous.qbit.service.Stoppable;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.transport.TransportListener;

/* loaded from: input_file:io/advantageous/qbit/jms/JmsService.class */
public class JmsService implements Stoppable, Startable {
    private final Supplier<Connection> connectionSupplier;
    private final Function<String, Destination> destinationSupplier;
    private final boolean transacted;
    private final int acknowledgeMode;
    private final boolean startConnection;
    private final String defaultDestination;
    private final int defaultTimeout;
    private final AtomicBoolean connected = new AtomicBoolean();
    private Optional<Connection> connectionOption = Optional.empty();
    private Optional<Session> sessionOption = Optional.empty();
    private Map<String, Destination> destinations = new LinkedHashMap();
    private Map<String, MessageProducer> producers = new LinkedHashMap();
    private Map<String, MessageConsumer> consumers = new LinkedHashMap();

    public JmsService(Supplier<Connection> supplier, Function<String, Destination> function, boolean z, int i, boolean z2, String str, int i2) {
        this.connectionSupplier = supplier;
        this.destinationSupplier = function;
        this.transacted = z;
        this.acknowledgeMode = i;
        this.startConnection = z2;
        this.defaultDestination = str;
        this.defaultTimeout = i2;
        getConnection();
    }

    private Destination getDestination(String str) {
        if (!this.destinations.containsKey(str)) {
            this.destinations.put(str, this.destinationSupplier.apply(str));
        }
        return this.destinations.get(str);
    }

    private MessageConsumer getConsumer(String str) {
        if (!this.consumers.containsKey(str)) {
            try {
                this.consumers.put(str, getSession().createConsumer(getDestination(str)));
            } catch (JMSException e) {
                throw new JmsException("Unable to create consumer for destination " + str, e);
            }
        }
        return this.consumers.get(str);
    }

    private MessageProducer getProducer(String str) {
        if (!this.producers.containsKey(str)) {
            try {
                this.producers.put(str, getSession().createProducer(getDestination(str)));
            } catch (JMSException e) {
                throw new JmsException("Unable to create producer for destination " + str, e);
            }
        }
        return this.producers.get(str);
    }

    private Session getSession() {
        if (!this.sessionOption.isPresent()) {
            try {
                this.sessionOption = Optional.of(getConnection().createSession(this.transacted, this.acknowledgeMode));
            } catch (JMSException e) {
                throw new JmsException("Unable to get JMS session", e);
            }
        }
        return this.sessionOption.get();
    }

    private Connection getConnection() {
        if (!this.connectionOption.isPresent()) {
            ActiveMQConnection activeMQConnection = (Connection) this.connectionSupplier.get();
            if (activeMQConnection instanceof ActiveMQConnection) {
                activeMQConnection.addTransportListener(new TransportListener() { // from class: io.advantageous.qbit.jms.JmsService.1
                    public void onCommand(Object obj) {
                    }

                    public void onException(IOException iOException) {
                    }

                    public void transportInterupted() {
                        JmsService.this.connected.set(false);
                    }

                    public void transportResumed() {
                        JmsService.this.connected.set(true);
                    }
                });
            }
            this.connected.set(true);
            if (this.startConnection) {
                try {
                    activeMQConnection.start();
                } catch (JMSException e) {
                    throw new JmsException("Unable to start JMS connection", e);
                }
            }
            this.connectionOption = Optional.of(activeMQConnection);
        }
        return this.connectionOption.get();
    }

    public void sendTextMessageWithDestination(String str, String str2) {
        if (!isConnected()) {
            throw new JmsNotConnectedException("JMS connection is down " + str);
        }
        try {
            getProducer(str).send(getSession().createTextMessage(str2));
        } catch (JMSException e) {
            throw new JmsException("Unable to send message to " + str, e);
        }
    }

    public void sendTextMessage(String str) {
        sendTextMessageWithDestination(this.defaultDestination, str);
    }

    public void listenTextMessagesWithDestination(String str, Consumer<String> consumer) {
        try {
            getConsumer(str).setMessageListener(message -> {
                try {
                    consumer.accept(((TextMessage) message).getText());
                    if (this.acknowledgeMode == 2) {
                        message.acknowledge();
                    }
                } catch (Exception e) {
                    throw new IllegalStateException("Unable handle JMS Consumer  " + str, e);
                } catch (JMSException e2) {
                    throw new JmsException("Unable to register get text from message in listener " + str, e2);
                }
            });
        } catch (JMSException e) {
            throw new JmsException("Unable to register message listener " + str, e);
        }
    }

    public void listenTextMessages(Consumer<String> consumer) {
        listenTextMessagesWithDestination(this.defaultDestination, consumer);
    }

    public String receiveTextMessageFromDestinationWithTimeout(String str, int i) {
        if (!isConnected()) {
            throw new JmsNotConnectedException("Not connected");
        }
        MessageConsumer consumer = getConsumer(str);
        try {
            TextMessage receive = i == 0 ? (TextMessage) consumer.receiveNoWait() : consumer.receive(i);
            if (receive == null) {
                return null;
            }
            if (this.acknowledgeMode == 2) {
                receive.acknowledge();
            }
            return receive.getText();
        } catch (JMSException e) {
            throw new JmsException("Unable to receive message from " + str, e);
        }
    }

    public String receiveTextMessageFromDestination(String str) {
        return receiveTextMessageFromDestinationWithTimeout(str, this.defaultTimeout);
    }

    public String receiveTextMessage() {
        return receiveTextMessageFromDestination(this.defaultDestination);
    }

    public String receiveTextMessageWithTimeout(int i) {
        return receiveTextMessageFromDestinationWithTimeout(this.defaultDestination, i);
    }

    public void stop() {
        if (this.connectionOption.isPresent()) {
            try {
                if (this.startConnection) {
                    this.connectionOption.get().close();
                }
                this.connectionOption = Optional.empty();
                this.sessionOption = Optional.empty();
                this.producers.clear();
                this.consumers.clear();
                this.destinations.clear();
            } catch (JMSException e) {
                throw new JmsException("Unable to stop ", e);
            }
        }
    }

    public void start() {
        getConnection();
    }

    public boolean isConnected() {
        return this.connected.get();
    }
}
