package de.fraunhofer.iosb.ilt.faaast.service.messagebus.internal;

import de.fraunhofer.iosb.ilt.faaast.service.ServiceContext;
import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig;
import de.fraunhofer.iosb.ilt.faaast.service.exception.MessageBusException;
import de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus;
import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.EventMessage;
import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.SubscriptionId;
import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.SubscriptionInfo;
import de.fraunhofer.iosb.ilt.faaast.service.util.Ensure;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/fraunhofer/iosb/ilt/faaast/service/messagebus/internal/MessageBusInternal.class */
public class MessageBusInternal implements MessageBus<MessageBusInternalConfig> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MessageBusInternal.class);
    private MessageBusInternalConfig config;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Map<SubscriptionId, SubscriptionInfo> subscriptions = new ConcurrentHashMap();
    private final BlockingQueue<EventMessage> messageQueue = new LinkedBlockingDeque();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    @Override // de.fraunhofer.iosb.ilt.faaast.service.config.Configurable
    public MessageBusInternalConfig asConfig() {
        return this.config;
    }

    @Override // de.fraunhofer.iosb.ilt.faaast.service.config.Configurable
    public void init(CoreConfig coreConfig, MessageBusInternalConfig messageBusInternalConfig, ServiceContext serviceContext) {
        this.config = messageBusInternalConfig;
        this.running.set(false);
    }

    @Override // de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus
    public void publish(EventMessage eventMessage) throws MessageBusException {
        if (eventMessage != null) {
            try {
                this.messageQueue.put(eventMessage);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new MessageBusException("adding message to queue failed", e);
            }
        }
    }

    private void run() {
        this.running.set(true);
        while (this.running.get()) {
            try {
                EventMessage take = this.messageQueue.take();
                Class<?> cls = take.getClass();
                for (SubscriptionInfo subscriptionInfo : this.subscriptions.values()) {
                    if (subscriptionInfo.getSubscribedEvents().stream().anyMatch(cls2 -> {
                        return cls2.isAssignableFrom(cls);
                    }) && subscriptionInfo.getFilter().test(take.getElement())) {
                        subscriptionInfo.getHandler().accept(take);
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    @Override // de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus
    public void start() {
        this.executor.submit(this::run);
    }

    @Override // de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus
    public void stop() {
        this.running.set(false);
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(2L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOGGER.error("interrupted while waiting for shutdown.", (Throwable) e);
            Thread.currentThread().interrupt();
        }
    }

    @Override // de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus
    public SubscriptionId subscribe(SubscriptionInfo subscriptionInfo) {
        Ensure.requireNonNull(subscriptionInfo, "subscriptionInfo must be non-null");
        SubscriptionId subscriptionId = new SubscriptionId();
        this.subscriptions.put(subscriptionId, subscriptionInfo);
        return subscriptionId;
    }

    @Override // de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus
    public void unsubscribe(SubscriptionId subscriptionId) {
        this.subscriptions.remove(subscriptionId);
    }
}
