package io.aboutcode.stage.web.websocket;

import io.aboutcode.stage.subscription.SubscriptionManager;
import io.aboutcode.stage.util.Action;
import io.aboutcode.stage.web.websocket.io.WebsocketIo;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WebSocket
/* loaded from: input_file:io/aboutcode/stage/web/websocket/WebsocketEndpoint.class */
public final class WebsocketEndpoint {
    private static final Logger logger = LoggerFactory.getLogger(WebsocketEndpoint.class);
    private final SubscriptionManager<Long, Consumer<Object>, Void> broadcastSubscriptionManager = SubscriptionManager.asynchronous();
    private final Map<Session, DefaultWebsocketClientSession> sessions = new HashMap();
    private final String path;

    /* renamed from: io, reason: collision with root package name */
    private final WebsocketIo f0io;
    private final List<AutowiredWebsocketHandler> handlers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/aboutcode/stage/web/websocket/WebsocketEndpoint$DefaultWebsocketClientSession.class */
    public class DefaultWebsocketClientSession implements WebsocketClientSession, WriteCallback {
        private final Map<String, Object> state = new HashMap();
        private final Map<Object, Long> broadcastTopicToHandle = new HashMap();
        private final Session session;
        private Action cleanupAction;

        DefaultWebsocketClientSession(Session session, Action action) {
            this.session = session;
            this.cleanupAction = action;
        }

        @Override // io.aboutcode.stage.web.websocket.WebsocketClientSession
        public void send(Object obj) {
            if (this.session.isOpen()) {
                sendRequest(this.session, obj);
            } else {
                WebsocketEndpoint.logger.warn("Unresponsive client at '{}', closing connection", this.session.getRemoteAddress());
                this.cleanupAction.accept();
            }
        }

        @Override // io.aboutcode.stage.web.websocket.WebsocketClientSession
        public void close(int i, String str) {
            this.session.close(i, str);
        }

        private void sendRequest(Session session, Object obj) {
            try {
                WebsocketEndpoint.this.f0io.serialize(obj).ifPresent(str -> {
                    session.getRemote().sendString(str, this);
                });
            } catch (IOException e) {
                WebsocketEndpoint.logger.error("Error serializing message '{}': {}", new Object[]{obj, e.getMessage(), e});
            }
        }

        @Override // io.aboutcode.stage.web.websocket.WebsocketClientSession
        public void subscribeClientToTopic(Object obj) {
            this.broadcastTopicToHandle.computeIfAbsent(obj, obj2 -> {
                return (Long) WebsocketEndpoint.this.broadcastSubscriptionManager.subscribe(obj2, (Object) null, this::send);
            });
        }

        void unsubscribeClientFromAll() {
            this.broadcastTopicToHandle.forEach((obj, l) -> {
                WebsocketEndpoint.this.broadcastSubscriptionManager.unsubscribe(l);
            });
        }

        @Override // io.aboutcode.stage.web.websocket.WebsocketClientSession
        public void addState(String str, Object obj) {
            synchronized (this.state) {
                this.state.put(str, obj);
            }
        }

        @Override // io.aboutcode.stage.web.websocket.WebsocketClientSession
        public Optional<Object> removeState(String str) {
            synchronized (this.state) {
                if (!this.state.containsKey(str)) {
                    return Optional.empty();
                }
                return Optional.ofNullable(this.state.remove(str));
            }
        }

        @Override // io.aboutcode.stage.web.websocket.WebsocketClientSession
        public Optional<Object> getState(String str) {
            synchronized (this.state) {
                if (!this.state.containsKey(str)) {
                    return Optional.empty();
                }
                return Optional.ofNullable(this.state.get(str));
            }
        }

        public void writeFailed(Throwable th) {
            WebsocketEndpoint.logger.error("Error in sending message to websocket connection: {}", th, th);
        }

        public void writeSuccess() {
        }

        @Override // io.aboutcode.stage.web.websocket.WebsocketClientSession
        public Map<String, List<String>> headers() {
            return this.session.getUpgradeRequest().getHeaders();
        }

        @Override // io.aboutcode.stage.web.websocket.WebsocketClientSession
        public List<String> headers(String str) {
            return this.session.getUpgradeRequest().getHeaders(str);
        }

        @Override // io.aboutcode.stage.web.websocket.WebsocketClientSession
        public Optional<String> header(String str) {
            return Optional.ofNullable(this.session.getUpgradeRequest().getHeader(str));
        }
    }

    public WebsocketEndpoint(String str, WebsocketIo websocketIo, List<AutowiredWebsocketHandler> list) {
        this.path = str;
        this.f0io = websocketIo;
        this.handlers = list;
    }

    public void initialize() {
        this.handlers.forEach(autowiredWebsocketHandler -> {
            autowiredWebsocketHandler.initialize(new WebsocketContext() { // from class: io.aboutcode.stage.web.websocket.WebsocketEndpoint.1
                @Override // io.aboutcode.stage.web.websocket.WebsocketContext
                public void publishToSubscribedClients(String str, Object obj) {
                    WebsocketEndpoint.this.broadcastSubscriptionManager.forTopic(str, (consumer, r5, subscriberContext) -> {
                        consumer.accept(obj);
                    });
                }

                @Override // io.aboutcode.stage.web.websocket.WebsocketContext
                public void publishToAllClients(Object obj) {
                    WebsocketEndpoint.this.broadcastSubscriptionManager.forAll((consumer, r5, subscriberContext) -> {
                        consumer.accept(obj);
                    });
                }
            });
        });
    }

    @OnWebSocketMessage
    public void onMessage(Session session, String str) {
        try {
            this.f0io.deserialize(str).ifPresent(obj -> {
                DefaultWebsocketClientSession defaultWebsocketClientSession = this.sessions.get(session);
                List list = (List) this.handlers.stream().map(autowiredWebsocketHandler -> {
                    return autowiredWebsocketHandler.invokeDataHandlers(obj, defaultWebsocketClientSession);
                }).flatMap((v0) -> {
                    return v0.stream();
                }).collect(Collectors.toList());
                Objects.requireNonNull(defaultWebsocketClientSession);
                list.forEach(defaultWebsocketClientSession::send);
            });
        } catch (IOException e) {
            logger.error("Message '{}' not handled by endpoint: {}", new Object[]{str, e.getMessage(), e});
        }
    }

    @OnWebSocketConnect
    public void onConnect(Session session) {
        DefaultWebsocketClientSession defaultWebsocketClientSession = new DefaultWebsocketClientSession(session, () -> {
            onDisconnect(session, 404, "Remote session terminated unexpectedly");
        });
        this.sessions.put(session, defaultWebsocketClientSession);
        this.handlers.forEach(autowiredWebsocketHandler -> {
            autowiredWebsocketHandler.onConnected(defaultWebsocketClientSession);
        });
    }

    @OnWebSocketClose
    public void onDisconnect(Session session, int i, String str) {
        logger.debug("Disconnecting with status '{}' because: {}", Integer.valueOf(i), str);
        DefaultWebsocketClientSession remove = this.sessions.remove(session);
        remove.unsubscribeClientFromAll();
        this.handlers.forEach(autowiredWebsocketHandler -> {
            autowiredWebsocketHandler.onDisconnected(remove);
        });
    }

    @OnWebSocketError
    public void onError(Session session, Throwable th) {
        logger.error("Error in websocket connection", th);
    }

    public String getPath() {
        return this.path;
    }
}
