package net.dongliu.prettypb.rpc.server;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import net.dongliu.prettypb.rpc.common.MethodInfo;
import net.dongliu.prettypb.rpc.common.ServiceInfo;
import net.dongliu.prettypb.rpc.common.TaskSet;
import net.dongliu.prettypb.rpc.exception.ServiceException;
import net.dongliu.prettypb.rpc.protocol.RpcCancel;
import net.dongliu.prettypb.rpc.protocol.RpcRequest;
import net.dongliu.prettypb.rpc.protocol.WirePayload;
import net.dongliu.prettypb.runtime.ExtensionRegistry;
import net.dongliu.prettypb.runtime.ProtoBufDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/dongliu/prettypb/rpc/server/RpcServerHandler.class */
public class RpcServerHandler extends MessageToMessageDecoder<WirePayload> {
    private static Logger logger = LoggerFactory.getLogger(RpcServerHandler.class);
    private final RpcServerChannel rpcServerChannel;
    private final RpcServiceRegistry rpcServiceRegistry;
    private final ThreadPoolExecutor rpcServiceExecutor;
    private final RpcServerChannelRegistry rpcServerChannelRegistry;
    private final ExtensionRegistry extensionRegistry;
    private final TaskSet<ServerCallTask> taskSet = new TaskSet<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/dongliu/prettypb/rpc/server/RpcServerHandler$TaskRunnable.class */
    public class TaskRunnable implements Runnable {
        private final ServerCallTask task;

        TaskRunnable(ServerCallTask serverCallTask) {
            this.task = serverCallTask;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.task.canceled()) {
                RpcServerHandler.this.taskSet.consume(this.task.id());
                return;
            }
            if (this.task.timeout()) {
                sendError("Rpc call time out");
            }
            final MethodInfo methodInfo = this.task.getMethodInfo();
            ServiceInfo serviceInfo = methodInfo.getServiceInfo();
            if (serviceInfo.async()) {
                try {
                    Futures.addCallback((ListenableFuture) methodInfo.getImplMethod().invoke(serviceInfo.getImpl(), this.task.getRequest()), new FutureCallback() { // from class: net.dongliu.prettypb.rpc.server.RpcServerHandler.TaskRunnable.1
                        public void onSuccess(Object obj) {
                            TaskRunnable.this.sendResponse(obj, methodInfo.getResponseType());
                        }

                        public void onFailure(Throwable th) {
                            TaskRunnable.this.sendError(th.getMessage());
                            RpcServerHandler.logger.error("service service method execute failed", th);
                        }
                    });
                    return;
                } catch (IllegalAccessException | InvocationTargetException e) {
                    sendError(e.getMessage());
                    RpcServerHandler.logger.error("call service method failed", e);
                    return;
                }
            }
            try {
                sendResponse(methodInfo.getImplMethod().invoke(serviceInfo.getImpl(), this.task.getRequest()), methodInfo.getResponseType());
            } catch (IllegalAccessException | InvocationTargetException e2) {
                sendError(e2.getMessage());
                RpcServerHandler.logger.error("call service method failed", e2);
            } catch (Throwable th) {
                sendError(th.getMessage());
                RpcServerHandler.logger.error("service service method execute failed", th);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendError(String str) {
            RpcServerHandler.this.taskSet.consume(this.task.id(), serverCallTask -> {
                RpcServerHandler.this.rpcServerChannel.sendRpcError(serverCallTask.id(), str);
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendResponse(Object obj, Class cls) {
            RpcServerHandler.this.taskSet.consume(this.task.id(), serverCallTask -> {
                RpcServerHandler.this.rpcServerChannel.sendRpcResponse(serverCallTask.id(), obj, cls);
            });
        }
    }

    public RpcServerHandler(RpcServerChannel rpcServerChannel, RpcServiceRegistry rpcServiceRegistry, ThreadPoolExecutor threadPoolExecutor, RpcServerChannelRegistry rpcServerChannelRegistry, ExtensionRegistry extensionRegistry) {
        this.rpcServerChannelRegistry = rpcServerChannelRegistry;
        this.rpcServerChannel = rpcServerChannel;
        this.rpcServiceRegistry = rpcServiceRegistry;
        this.rpcServiceExecutor = threadPoolExecutor;
        this.extensionRegistry = extensionRegistry;
    }

    protected void decode(ChannelHandlerContext channelHandlerContext, WirePayload wirePayload, List<Object> list) throws Exception {
        if (wirePayload.hasRpcRequest()) {
            onRequest(wirePayload.getRpcRequest());
        } else if (wirePayload.hasRpcCancel()) {
            cancel(wirePayload.getRpcCancel());
        } else {
            list.add(wirePayload);
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        this.rpcServerChannelRegistry.removeRpcServerChannel(this.rpcServerChannel);
        onClose();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        logger.warn("Exception caught during RPC operation.", th);
        channelHandlerContext.close();
        onClose();
    }

    public RpcServerChannelRegistry getRpcServerChannelRegistry() {
        return this.rpcServerChannelRegistry;
    }

    public void onRequest(RpcRequest rpcRequest) {
        long currentTimeMillis = System.currentTimeMillis();
        int correlationId = rpcRequest.getCorrelationId();
        ServiceInfo simpleResolveService = rpcRequest.hasServicePackage() ? this.rpcServiceRegistry.simpleResolveService(rpcRequest.getServiceIdentifier()) : this.rpcServiceRegistry.resolveService(rpcRequest.getServicePackage(), rpcRequest.getMethodIdentifier());
        if (simpleResolveService == null) {
            String str = "Unknown Service: " + rpcRequest.getServiceIdentifier();
            logger.error("service not found: {}", rpcRequest.getServiceIdentifier());
            this.rpcServerChannel.sendRpcError(correlationId, str);
            return;
        }
        MethodInfo methodInfo = simpleResolveService.getMethodInfo(rpcRequest.getMethodIdentifier());
        if (methodInfo == null) {
            String str2 = "Unknown Method: " + rpcRequest.getMethodIdentifier();
            logger.error("rpc method not found: {}", rpcRequest.getMethodIdentifier());
            this.rpcServerChannel.sendRpcError(correlationId, str2);
            return;
        }
        try {
            ServerCallTask serverCallTask = new ServerCallTask(methodInfo, ProtoBufDecoder.fromBytes(methodInfo.getRequestType(), rpcRequest.getRequestBytes(), this.extensionRegistry), currentTimeMillis, rpcRequest.getTimeoutMs(), correlationId);
            if (!this.taskSet.add(serverCallTask)) {
                throw new ServiceException("add task failed");
            }
            submit(serverCallTask);
        } catch (RuntimeException e) {
            String str3 = "Invalid request protobuf:" + e.getMessage();
            logger.error("invalid request protobuf", e);
            this.rpcServerChannel.sendRpcError(correlationId, str3);
        }
    }

    private void submit(ServerCallTask serverCallTask) {
        try {
            this.rpcServiceExecutor.submit(new TaskRunnable(serverCallTask));
        } catch (RejectedExecutionException e) {
            this.taskSet.consume(serverCallTask.id(), serverCallTask2 -> {
                this.rpcServerChannel.sendRpcError(serverCallTask2.id(), "too many request to service");
            });
        }
    }

    public void cancel(RpcCancel rpcCancel) {
        this.taskSet.consume(rpcCancel.getCorrelationId(), this::cancelFromExecutor);
    }

    private void cancelFromExecutor(ServerCallTask serverCallTask) {
        serverCallTask.cancel();
    }

    public void onClose() {
        logger.debug("rpc channel closed.");
        this.taskSet.close();
    }

    protected /* bridge */ /* synthetic */ void decode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
        decode(channelHandlerContext, (WirePayload) obj, (List<Object>) list);
    }
}
