package de.fraunhofer.iosb.ilt.faaast.service.assetconnection;

import de.fraunhofer.iosb.ilt.faaast.service.ServiceContext;
import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig;
import de.fraunhofer.iosb.ilt.faaast.service.exception.ConfigurationException;
import de.fraunhofer.iosb.ilt.faaast.service.exception.InvalidConfigurationException;
import de.fraunhofer.iosb.ilt.faaast.service.model.api.Request;
import de.fraunhofer.iosb.ilt.faaast.service.model.api.Response;
import de.fraunhofer.iosb.ilt.faaast.service.model.api.request.SetSubmodelElementValueByPathRequest;
import de.fraunhofer.iosb.ilt.faaast.service.model.value.DataElementValue;
import de.fraunhofer.iosb.ilt.faaast.service.model.value.ElementValue;
import de.fraunhofer.iosb.ilt.faaast.service.util.ElementValueHelper;
import de.fraunhofer.iosb.ilt.faaast.service.util.LambdaExceptionHelper;
import de.fraunhofer.iosb.ilt.faaast.service.util.ReferenceHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.eclipse.digitaltwin.aas4j.v3.model.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/fraunhofer/iosb/ilt/faaast/service/assetconnection/AssetConnectionManager.class */
public class AssetConnectionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) AssetConnectionManager.class);
    private final List<AssetConnection> connections;
    private final CoreConfig coreConfig;
    private final ServiceContext serviceContext;
    private final ScheduledExecutorService scheduledExecutorService;
    private volatile boolean active = true;

    public AssetConnectionManager(CoreConfig coreConfig, List<AssetConnection> list, ServiceContext serviceContext) throws ConfigurationException {
        this.coreConfig = coreConfig;
        this.connections = list != null ? list : new ArrayList<>();
        this.serviceContext = serviceContext;
        validateConnections();
        this.scheduledExecutorService = Executors.newScheduledThreadPool(this.connections.size(), new ThreadFactory() { // from class: de.fraunhofer.iosb.ilt.faaast.service.assetconnection.AssetConnectionManager.1
            AtomicLong count = new AtomicLong(0);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, String.format("asset connection establisher - %d", Long.valueOf(this.count.getAndIncrement())));
            }
        });
    }

    public void start() {
        if (!this.connections.isEmpty()) {
            LOGGER.info("Connecting to assets...");
        }
        for (AssetConnection assetConnection : this.connections) {
            try {
                tryConnecting(assetConnection);
                setupSubscriptions(assetConnection);
            } catch (AssetConnectionException e) {
                LOGGER.info("Establishing asset connection failed on initial attempt (endpoint: {}). Connecting will be retried every {}ms but no more messages about failures will be shown.", assetConnection.getEndpointInformation(), Long.valueOf(this.coreConfig.getAssetConnectionRetryInterval()), e);
                setupConnectionAsync(assetConnection);
            }
        }
    }

    private void tryConnecting(AssetConnection assetConnection) throws AssetConnectionException {
        assetConnection.connect();
        LOGGER.info("Asset connection established (endpoint: {})", assetConnection.getEndpointInformation());
    }

    private void tryConnectingUntilSuccess(AssetConnection assetConnection) {
        while (this.active && !assetConnection.isConnected()) {
            try {
                tryConnecting(assetConnection);
            } catch (AssetConnectionException e) {
                try {
                    LOGGER.trace("Establishing asset connection failed (endpoint: {})", assetConnection.getEndpointInformation(), e);
                    Thread.sleep(this.coreConfig.getAssetConnectionRetryInterval());
                } catch (InterruptedException e2) {
                    LOGGER.error("Error while establishing asset connection (endpoint: {})", assetConnection.getEndpointInformation(), e2);
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void setupSubscriptions(AssetConnection assetConnection) {
        if (this.active) {
            for (Map.Entry entry : assetConnection.getSubscriptionProviders().entrySet()) {
                try {
                    ((AssetSubscriptionProvider) entry.getValue()).addNewDataListener(dataElementValue -> {
                        Response execute = this.serviceContext.execute((Request) ((SetSubmodelElementValueByPathRequest.Builder) SetSubmodelElementValueByPathRequest.builder().submodelId(((Reference) entry.getKey()).getKeys().get(0).getValue())).path(ReferenceHelper.toPath((Reference) entry.getKey())).disableSyncWithAsset().value(dataElementValue).build());
                        if (execute.getStatusCode().isSuccess()) {
                            return;
                        }
                        LOGGER.atInfo().log("Error updating value from asset connection subscription (reference: {})", ReferenceHelper.toString((Reference) entry.getKey()));
                        LOGGER.debug("Error updating value from asset connection subscription (reference: {}, reason: {})", ReferenceHelper.toString((Reference) entry.getKey()), execute.getResult().getMessages());
                    });
                } catch (AssetConnectionException e) {
                    LOGGER.warn("Subscribing to asset connection failed (reference: {})", ReferenceHelper.toString((Reference) entry.getKey()), e);
                }
            }
        }
    }

    private void setupConnectionAsync(AssetConnection assetConnection) {
        this.scheduledExecutorService.schedule(() -> {
            tryConnectingUntilSuccess(assetConnection);
            setupSubscriptions(assetConnection);
        }, 0L, TimeUnit.MILLISECONDS);
    }

    public void add(AssetConnectionConfig<? extends AssetConnection, ? extends AssetValueProviderConfig, ? extends AssetOperationProviderConfig, ? extends AssetSubscriptionProviderConfig> assetConnectionConfig) throws ConfigurationException, AssetConnectionException {
        AssetConnection assetConnection = (AssetConnection) assetConnectionConfig.newInstance(this.coreConfig, this.serviceContext);
        Optional<AssetConnection> findFirst = this.connections.stream().filter(assetConnection2 -> {
            return Objects.equals(assetConnection2, assetConnection);
        }).findFirst();
        if (findFirst.isPresent()) {
            assetConnectionConfig.getValueProviders().forEach(LambdaExceptionHelper.rethrowBiConsumer((reference, assetValueProviderConfig) -> {
                ((AssetConnection) findFirst.get()).registerValueProvider(reference, assetValueProviderConfig);
            }));
            assetConnectionConfig.getSubscriptionProviders().forEach(LambdaExceptionHelper.rethrowBiConsumer((reference2, assetSubscriptionProviderConfig) -> {
                ((AssetConnection) findFirst.get()).registerSubscriptionProvider(reference2, assetSubscriptionProviderConfig);
            }));
            assetConnectionConfig.getOperationProviders().forEach(LambdaExceptionHelper.rethrowBiConsumer((reference3, assetOperationProviderConfig) -> {
                ((AssetConnection) findFirst.get()).registerOperationProvider(reference3, assetOperationProviderConfig);
            }));
        } else {
            this.connections.add(assetConnection);
            validateConnections();
        }
        validateConnections();
    }

    public List<AssetConnection> getConnections() {
        return this.connections;
    }

    public void stop() {
        this.active = false;
        try {
            this.scheduledExecutorService.awaitTermination(this.coreConfig.getAssetConnectionRetryInterval() * 2, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            this.scheduledExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
        this.connections.stream().filter((v0) -> {
            return v0.isConnected();
        }).forEach(assetConnection -> {
            try {
                assetConnection.disconnect();
            } catch (AssetConnectionException e2) {
                LOGGER.trace("Error closing asset connection (endpoint: {})", assetConnection.getEndpointInformation(), e2);
            }
        });
    }

    public AssetOperationProvider getOperationProvider(Reference reference) {
        return (AssetOperationProvider) this.connections.stream().flatMap(assetConnection -> {
            return assetConnection.getOperationProviders().entrySet().stream();
        }).filter(entry -> {
            return ReferenceHelper.equals(reference, (Reference) entry.getKey());
        }).map((v0) -> {
            return v0.getValue();
        }).findFirst().orElse(null);
    }

    public AssetSubscriptionProvider getSubscriptionProvider(Reference reference) {
        return (AssetSubscriptionProvider) this.connections.stream().flatMap(assetConnection -> {
            return assetConnection.getSubscriptionProviders().entrySet().stream();
        }).filter(entry -> {
            return ReferenceHelper.equals(reference, (Reference) entry.getKey());
        }).map((v0) -> {
            return v0.getValue();
        }).findFirst().orElse(null);
    }

    public AssetValueProvider getValueProvider(Reference reference) {
        return (AssetValueProvider) this.connections.stream().flatMap(assetConnection -> {
            return assetConnection.getValueProviders().entrySet().stream();
        }).filter(entry -> {
            return ReferenceHelper.equals(reference, (Reference) entry.getKey());
        }).map((v0) -> {
            return v0.getValue();
        }).findFirst().orElse(null);
    }

    public void setValue(Reference reference, ElementValue elementValue) throws AssetConnectionException {
        if (hasValueProvider(reference) && ElementValueHelper.isValidDataElementValue(elementValue)) {
            try {
                getValueProvider(reference).setValue((DataElementValue) elementValue);
            } catch (UnsupportedOperationException e) {
            }
        }
    }

    public Optional<DataElementValue> readValue(Reference reference) throws AssetConnectionException {
        if (hasValueProvider(reference)) {
            try {
                return Optional.ofNullable(getValueProvider(reference).getValue());
            } catch (UnsupportedOperationException e) {
            }
        }
        return Optional.empty();
    }

    public boolean hasOperationProvider(Reference reference) {
        return Objects.nonNull(getOperationProvider(reference));
    }

    public boolean hasSubscriptionProvider(Reference reference) {
        return Objects.nonNull(getSubscriptionProvider(reference));
    }

    public boolean hasValueProvider(Reference reference) {
        return Objects.nonNull(getValueProvider(reference));
    }

    private void validateConnections() throws ConfigurationException {
        Optional findFirst = ((Map) this.connections.stream().flatMap(assetConnection -> {
            return assetConnection.getValueProviders().entrySet().stream();
        }).collect(Collectors.groupingBy(entry -> {
            return (Reference) entry.getKey();
        }, Collectors.mapping(entry2 -> {
            return (AssetValueProvider) entry2.getValue();
        }, Collectors.toList())))).entrySet().stream().filter(entry3 -> {
            return ((List) entry3.getValue()).size() > 1;
        }).findFirst();
        if (findFirst.isPresent()) {
            throw new InvalidConfigurationException(String.format("found %d value providers for reference %s but maximum 1 allowed", Integer.valueOf(((List) ((Map.Entry) findFirst.get()).getValue()).size()), ReferenceHelper.toString((Reference) ((Map.Entry) findFirst.get()).getKey())));
        }
        Optional findFirst2 = ((Map) this.connections.stream().flatMap(assetConnection2 -> {
            return assetConnection2.getOperationProviders().entrySet().stream();
        }).collect(Collectors.groupingBy(entry4 -> {
            return (Reference) entry4.getKey();
        }, Collectors.mapping(entry5 -> {
            return (AssetOperationProvider) entry5.getValue();
        }, Collectors.toList())))).entrySet().stream().filter(entry6 -> {
            return ((List) entry6.getValue()).size() > 1;
        }).findFirst();
        if (findFirst2.isPresent()) {
            throw new InvalidConfigurationException(String.format("found %d operation providers for reference %s but maximum 1 allowed", Integer.valueOf(((List) ((Map.Entry) findFirst2.get()).getValue()).size()), ReferenceHelper.toString((Reference) ((Map.Entry) findFirst2.get()).getKey())));
        }
        Optional findFirst3 = ((Map) this.connections.stream().flatMap(assetConnection3 -> {
            return assetConnection3.getSubscriptionProviders().entrySet().stream();
        }).collect(Collectors.groupingBy(entry7 -> {
            return (Reference) entry7.getKey();
        }, Collectors.mapping(entry8 -> {
            return (AssetSubscriptionProvider) entry8.getValue();
        }, Collectors.toList())))).entrySet().stream().filter(entry9 -> {
            return ((List) entry9.getValue()).size() > 1;
        }).findFirst();
        if (findFirst3.isPresent()) {
            throw new InvalidConfigurationException(String.format("found %d subscription providers for reference %s but maximum 1 allowed", Integer.valueOf(((List) ((Map.Entry) findFirst3.get()).getValue()).size()), ReferenceHelper.toString((Reference) ((Map.Entry) findFirst3.get()).getKey())));
        }
    }
}
