package com.zx.sms.session;

import PduParser.CharacterSets;
import com.zx.sms.BaseMessage;
import com.zx.sms.common.SendFailException;
import com.zx.sms.common.SmsLifeTerminateException;
import com.zx.sms.common.storedMap.VersionObject;
import com.zx.sms.common.util.CachedMillisecondClock;
import com.zx.sms.config.PropertiesUtils;
import com.zx.sms.connect.manager.EndpointConnector;
import com.zx.sms.connect.manager.EndpointEntity;
import com.zx.sms.connect.manager.EndpointManager;
import com.zx.sms.session.cmpp.SessionState;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zx/sms/session/AbstractSessionStateManager.class */
public abstract class AbstractSessionStateManager<K, T extends BaseMessage> extends ChannelDuplexHandler {
    private final Logger errlogger;
    private EndpointEntity entity;
    private final ConcurrentMap<K, VersionObject<T>> storeMap;
    private ChannelHandlerContext ctx;
    private boolean preSend;
    private static final Logger logger = LoggerFactory.getLogger(AbstractSessionStateManager.class);
    private static final ScheduledThreadPoolExecutor msgResend = new ScheduledThreadPoolExecutor(Integer.valueOf(PropertiesUtils.getproperties("GlobalMsgResendThreadCount", "4")).intValue(), new ThreadFactory() { // from class: com.zx.sms.session.AbstractSessionStateManager.1
        private final AtomicInteger threadNumber = new AtomicInteger(1);

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "msgResend-" + this.threadNumber.getAndIncrement());
            thread.setDaemon(true);
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            return thread;
        }
    }, new ThreadPoolExecutor.DiscardPolicy());
    private long msgReadCount = 0;
    private long msgWriteCount = 0;
    private final long version = System.currentTimeMillis();
    private final ConcurrentHashMap<K, AbstractSessionStateManager<K, T>.Entry> msgRetryMap = new ConcurrentHashMap<>();
    private boolean preSendover = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/zx/sms/session/AbstractSessionStateManager$Entry.class */
    public class Entry {
        volatile Future future;
        AtomicInteger cnt = new AtomicInteger(1);
        T request;
        boolean sync;
        DefaultPromise<T> resfuture;

        Entry(T t, boolean z) {
            this.sync = false;
            this.request = t;
            this.sync = z;
        }
    }

    public AbstractSessionStateManager(EndpointEntity endpointEntity, ConcurrentMap<K, VersionObject<T>> concurrentMap, boolean z) {
        this.entity = endpointEntity;
        this.errlogger = LoggerFactory.getLogger("error." + endpointEntity.getId());
        this.storeMap = concurrentMap;
        this.preSend = z;
    }

    public int getWaittingResp() {
        return this.storeMap.size();
    }

    public long getReadCount() {
        return this.msgReadCount;
    }

    public long getWriteCount() {
        return this.msgWriteCount;
    }

    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.ctx = channelHandlerContext;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setUserDefinedWritability(ChannelHandlerContext channelHandlerContext, boolean z) {
        ChannelOutboundBuffer outboundBuffer = channelHandlerContext.channel().unsafe().outboundBuffer();
        if (outboundBuffer != null) {
            outboundBuffer.setUserDefinedWritability(31, z);
        }
    }

    public void channelInactive(final ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.executor().execute(new Runnable() { // from class: com.zx.sms.session.AbstractSessionStateManager.2
            @Override // java.lang.Runnable
            public void run() {
                Channel fetch;
                EndpointConnector<?> endpointConnector = EndpointManager.INS.getEndpointConnector(AbstractSessionStateManager.this.entity);
                Iterator it = AbstractSessionStateManager.this.msgRetryMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    if (entry != null) {
                        Entry entry2 = (Entry) entry.getValue();
                        T t = entry2.request;
                        boolean z = !entry2.sync;
                        if (endpointConnector != null && (fetch = endpointConnector.fetch()) != null && fetch.isActive()) {
                            if (AbstractSessionStateManager.this.entity.isReSendFailMsg() && z) {
                                fetch.writeAndFlush(t);
                                AbstractSessionStateManager.logger.warn("current channel {} is closed.send requestMsg {} from other channel {} which is active.", new Object[]{channelHandlerContext.channel(), t, fetch});
                            } else {
                                AbstractSessionStateManager.this.errlogger.error("Channel closed . Msg {} may not send Success. ", t);
                            }
                        }
                        AbstractSessionStateManager.this.cancelRetry(entry2, channelHandlerContext.channel());
                        AbstractSessionStateManager.this.responseFutureDone(entry2, new IOException("channel closed."));
                        it.remove();
                    }
                }
                if (!AbstractSessionStateManager.this.preSend || AbstractSessionStateManager.this.preSendover) {
                    return;
                }
                for (Map.Entry entry3 : AbstractSessionStateManager.this.storeMap.entrySet()) {
                    if (endpointConnector == null) {
                        return;
                    }
                    Channel fetch2 = endpointConnector.fetch();
                    if (fetch2 != null && fetch2.isActive()) {
                        entry3.getKey();
                        VersionObject versionObject = (VersionObject) entry3.getValue();
                        long version = versionObject.getVersion();
                        BaseMessage baseMessage = (BaseMessage) versionObject.getObj();
                        if (AbstractSessionStateManager.this.version > version && baseMessage != null) {
                            AbstractSessionStateManager.logger.debug("Send last failed msg . {}", baseMessage);
                            fetch2.writeAndFlush(baseMessage);
                        }
                    }
                }
            }
        });
        channelHandlerContext.fireChannelInactive();
    }

    protected abstract K getSequenceId(T t);

    protected abstract boolean needSendAgainByResponse(T t, T t2);

    protected abstract boolean closeWhenRetryFailed(T t);

    /* JADX WARN: Multi-variable type inference failed */
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        this.msgReadCount++;
        if ((obj instanceof BaseMessage) && ((BaseMessage) obj).isResponse()) {
            BaseMessage baseMessage = (BaseMessage) obj;
            Object sequenceId = getSequenceId(baseMessage);
            VersionObject<T> remove = this.storeMap.remove(sequenceId);
            if (remove != null) {
                T obj2 = remove.getObj();
                long version = remove.getVersion();
                baseMessage.setRequest(obj2);
                long delaycheck = delaycheck(version);
                if (delaycheck > (this.entity.getRetryWaitTimeSec() * CharacterSets.UCS2) / 4) {
                    this.errlogger.warn("delaycheck . delay :{} , SequenceId :{}", Long.valueOf(delaycheck), getSequenceId(baseMessage));
                    setchannelunwritable(channelHandlerContext, 1000L);
                }
                AbstractSessionStateManager<K, T>.Entry entry = this.msgRetryMap.get(sequenceId);
                if (needSendAgainByResponse(obj2, baseMessage)) {
                    cancelRetry(entry, channelHandlerContext.channel());
                    setchannelunwritable(channelHandlerContext, 40L);
                    reWriteLater(channelHandlerContext, entry.request, channelHandlerContext.newPromise(), 400);
                } else {
                    cancelRetry(entry, channelHandlerContext.channel());
                    responseFutureDone(entry, (AbstractSessionStateManager<K, T>.Entry) baseMessage);
                    this.msgRetryMap.remove(sequenceId);
                }
            } else {
                this.errlogger.warn("receive ResponseMessage ,but not found related Request Msg. response:{}", baseMessage);
            }
        }
        channelHandlerContext.fireChannelRead(obj);
    }

    private long delaycheck(long j) {
        return CachedMillisecondClock.INS.now() - j;
    }

    private void setchannelunwritable(final ChannelHandlerContext channelHandlerContext, long j) {
        if (channelHandlerContext.channel().isWritable()) {
            setUserDefinedWritability(channelHandlerContext, false);
            channelHandlerContext.executor().schedule(new Runnable() { // from class: com.zx.sms.session.AbstractSessionStateManager.3
                @Override // java.lang.Runnable
                public void run() {
                    AbstractSessionStateManager.this.setUserDefinedWritability(channelHandlerContext, true);
                }
            }, j, TimeUnit.MILLISECONDS);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        if (!(obj instanceof BaseMessage)) {
            channelHandlerContext.write(obj, channelPromise);
            return;
        }
        BaseMessage baseMessage = (BaseMessage) obj;
        if (baseMessage.isRequest()) {
            writeWithWindow(channelHandlerContext, baseMessage, channelPromise);
        } else {
            channelHandlerContext.write(baseMessage, channelPromise);
        }
    }

    public void userEventTriggered(final ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj == SessionState.Connect) {
            channelHandlerContext.executor().execute(new Runnable() { // from class: com.zx.sms.session.AbstractSessionStateManager.4
                @Override // java.lang.Runnable
                public void run() {
                    AbstractSessionStateManager.this.preSendMsg(channelHandlerContext);
                }
            });
        }
        channelHandlerContext.fireUserEventTriggered(obj);
    }

    private boolean writeWithWindow(ChannelHandlerContext channelHandlerContext, T t, ChannelPromise channelPromise) {
        try {
            safewrite(channelHandlerContext, t, channelPromise, false);
            return true;
        } catch (Exception e) {
            channelPromise.tryFailure(e);
            logger.error("writeWithWindow: ", e.getCause() != null ? e.getCause() : e);
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleRetryMsg(final ChannelHandlerContext channelHandlerContext, final T t) {
        final K sequenceId = getSequenceId(t);
        final AbstractSessionStateManager<K, T>.Entry entry = this.msgRetryMap.get(sequenceId);
        if (entry == null) {
            if (entry == null) {
                logger.warn("receive seq {} not exists in msgRetryMap,maybe response received before create retrytask .", sequenceId);
                return;
            }
            return;
        }
        final AtomicReference atomicReference = new AtomicReference();
        ScheduledFuture<?> scheduleWithFixedDelay = msgResend.scheduleWithFixedDelay(new Runnable() { // from class: com.zx.sms.session.AbstractSessionStateManager.5
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (channelHandlerContext.channel().isActive()) {
                        int i = entry.cnt.get();
                        AbstractSessionStateManager.logger.warn("entity : {} , retry Send Msg : {}", AbstractSessionStateManager.this.entity.getId(), t);
                        if (i < AbstractSessionStateManager.this.entity.getMaxRetryCnt()) {
                            AbstractSessionStateManager.this.msgWriteCount++;
                            entry.cnt.incrementAndGet();
                            channelHandlerContext.writeAndFlush(t, channelHandlerContext.newPromise());
                            return;
                        }
                        Future future = (Future) atomicReference.get();
                        if (future != null) {
                            future.cancel(false);
                        }
                        AbstractSessionStateManager.this.cancelRetry(entry, channelHandlerContext.channel());
                        AbstractSessionStateManager.this.responseFutureDone(entry, new SendFailException("retry send msg over " + i + " times"));
                        AbstractSessionStateManager.this.msgRetryMap.remove(sequenceId);
                        AbstractSessionStateManager.this.storeMap.remove(sequenceId);
                        AbstractSessionStateManager.this.errlogger.error("entity : {} , RetryFailed: {}", AbstractSessionStateManager.this.entity.getId(), t);
                        if (AbstractSessionStateManager.this.closeWhenRetryFailed(t)) {
                            AbstractSessionStateManager.logger.error("entity : {} , retry send {} times Message {} ,the connection may die.close it", new Object[]{AbstractSessionStateManager.this.entity.getId(), Integer.valueOf(i), t});
                            channelHandlerContext.close();
                        }
                    }
                } catch (Throwable th) {
                    AbstractSessionStateManager.logger.error("retry Send Msg Error: {}", t);
                    AbstractSessionStateManager.logger.error("retry send Msg Error.", th);
                }
            }
        }, this.entity.getRetryWaitTimeSec(), this.entity.getRetryWaitTimeSec(), TimeUnit.SECONDS);
        atomicReference.set(scheduleWithFixedDelay);
        entry.future = scheduleWithFixedDelay;
        if (this.msgRetryMap.get(sequenceId) == null) {
            scheduleWithFixedDelay.cancel(false);
        }
    }

    private AbstractSessionStateManager<K, T>.Entry responseFutureDone(AbstractSessionStateManager<K, T>.Entry entry, T t) {
        if (entry == null || entry.resfuture == 0) {
            return null;
        }
        entry.resfuture.setSuccess(t);
        return entry;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AbstractSessionStateManager<K, T>.Entry responseFutureDone(AbstractSessionStateManager<K, T>.Entry entry, Throwable th) {
        if (entry == null || entry.resfuture == 0) {
            return null;
        }
        entry.resfuture.tryFailure(th);
        return entry;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AbstractSessionStateManager<K, T>.Entry cancelRetry(AbstractSessionStateManager<K, T>.Entry entry, Channel channel) {
        if (entry == null || entry.future == null) {
            logger.debug("cancelRetry task failed.");
        } else {
            entry.future.cancel(false);
            if (entry.future instanceof RunnableScheduledFuture) {
                msgResend.remove((RunnableScheduledFuture) entry.future);
            }
            entry.future = null;
        }
        return entry;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void preSendMsg(ChannelHandlerContext channelHandlerContext) {
        boolean z = false;
        if (this.preSend) {
            Iterator<Map.Entry<K, VersionObject<T>>> it = this.storeMap.entrySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Map.Entry<K, VersionObject<T>> next = it.next();
                if (!channelHandlerContext.channel().isActive()) {
                    z = true;
                    break;
                }
                next.getKey();
                VersionObject<T> value = next.getValue();
                long version = value.getVersion();
                T obj = value.getObj();
                if (this.version > version && obj != null) {
                    logger.debug("Send last failed msg . {}", obj);
                    writeWithWindow(channelHandlerContext, obj, channelHandlerContext.newPromise());
                }
            }
        }
        this.preSendover = !z;
    }

    private Promise<T> safewrite(final ChannelHandlerContext channelHandlerContext, final T t, ChannelPromise channelPromise, boolean z) {
        if (!channelHandlerContext.channel().isActive()) {
            StringBuilder sb = new StringBuilder();
            sb.append("Connection ").append(channelHandlerContext.channel()).append(" has closed");
            IOException iOException = new IOException(sb.toString());
            if (channelPromise != null && !channelPromise.isDone()) {
                channelPromise.tryFailure(iOException);
            }
            DefaultPromise defaultPromise = new DefaultPromise(channelHandlerContext.executor());
            defaultPromise.tryFailure(iOException);
            return defaultPromise;
        }
        if (t.isTerminated()) {
            this.errlogger.error("Msg Life over .{}", t);
            channelPromise.tryFailure(new SmsLifeTerminateException("Msg Life over"));
            DefaultPromise defaultPromise2 = new DefaultPromise(channelHandlerContext.executor());
            defaultPromise2.tryFailure(new SmsLifeTerminateException("Msg Life over"));
            return defaultPromise2;
        }
        final K sequenceId = getSequenceId(t);
        boolean containsKey = this.msgRetryMap.containsKey(sequenceId);
        AbstractSessionStateManager<K, T>.Entry entry = new Entry(t, z);
        if (containsKey) {
            AbstractSessionStateManager<K, T>.Entry entry2 = this.msgRetryMap.get(sequenceId);
            if (!t.equals(entry2.request)) {
                logger.error("has repeat Sequense {}", sequenceId);
                if (!z) {
                    reWriteLater(channelHandlerContext, t, channelPromise, 250);
                    return null;
                }
                StringBuilder sb2 = new StringBuilder();
                sb2.append("seqId:").append(sequenceId);
                sb2.append(".it Has a same sequenceId with another message:").append(entry2.request).append(". wait it complete.");
                IOException iOException2 = new IOException(sb2.toString());
                DefaultPromise defaultPromise3 = new DefaultPromise(channelHandlerContext.executor());
                defaultPromise3.tryFailure(iOException2);
                return defaultPromise3;
            }
        } else {
            entry.resfuture = new DefaultPromise<>(channelHandlerContext.executor());
            this.msgRetryMap.put(sequenceId, entry);
        }
        this.msgWriteCount++;
        this.storeMap.put(sequenceId, new VersionObject<>(t));
        channelPromise.addListener(new ChannelFutureListener() { // from class: com.zx.sms.session.AbstractSessionStateManager.6
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    AbstractSessionStateManager.this.scheduleRetryMsg(channelHandlerContext, t);
                    return;
                }
                AbstractSessionStateManager.logger.error("remove fail message Sequense {}", sequenceId);
                AbstractSessionStateManager.this.storeMap.remove(sequenceId);
                AbstractSessionStateManager.this.responseFutureDone((Entry) AbstractSessionStateManager.this.msgRetryMap.remove(sequenceId), channelFuture.cause());
            }
        });
        channelHandlerContext.writeAndFlush(t, channelPromise);
        return entry.resfuture;
    }

    private void reWriteLater(final ChannelHandlerContext channelHandlerContext, final T t, final ChannelPromise channelPromise, int i) {
        msgResend.schedule(new Runnable() { // from class: com.zx.sms.session.AbstractSessionStateManager.7
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AbstractSessionStateManager.this.write(channelHandlerContext, t, channelPromise);
                } catch (Exception e) {
                    AbstractSessionStateManager.logger.error("has repeat Sequense ,and write Msg err {}", t);
                }
            }
        }, i, TimeUnit.MILLISECONDS);
    }

    public Promise<T> writeMessagesync(T t) {
        return safewrite(this.ctx, t, this.ctx.newPromise(), true);
    }

    public EndpointEntity getEntity() {
        return this.entity;
    }
}
