package com.alibaba.tc.sp.input;

import com.alibaba.tc.SystemProperty;
import com.alibaba.tc.offheap.ByteArray;
import com.alibaba.tc.sp.Delay;
import com.alibaba.tc.sp.StreamProcessing;
import com.alibaba.tc.sp.input.SlsParser;
import com.alibaba.tc.table.Column;
import com.alibaba.tc.table.TableBuilder;
import com.alibaba.tc.table.Type;
import com.alibaba.tc.util.DateUtil;
import com.alibaba.tc.util.IpUtil;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.ListShardRequest;
import com.aliyun.openservices.log.response.ListConsumerGroupResponse;
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
import java.text.ParseException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/tc/sp/input/SlsStreamTable.class */
public class SlsStreamTable extends AbstractStreamTable {
    private static final Logger logger = LoggerFactory.getLogger(SlsStreamTable.class);
    private final String endPoint;
    private final String accessId;
    private final String accessKey;
    private final String project;
    private final String logstore;
    private final String consumerGroup;
    private final int consumeFrom;
    private final int consumeTo;
    private final String sign;
    private long finishDelayMs;
    private long lastUpdateMs;
    private final Set<Integer> shardSet;
    private int shardSetSize;
    private final List<ClientWorker> workers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/tc/sp/input/SlsStreamTable$LogHubProcessor.class */
    public class LogHubProcessor implements ILogHubProcessor {
        private final int threadId;
        private final SlsStreamTable slsStreamTable;
        private int shardId;
        private long mLastCheckTime = 0;

        LogHubProcessor(int i, SlsStreamTable slsStreamTable) {
            this.threadId = i;
            this.slsStreamTable = slsStreamTable;
        }

        public void initialize(int i) {
            this.shardId = i;
            this.slsStreamTable.addShard(i);
        }

        public String process(List<LogGroupData> list, ILogHubCheckPointTracker iLogHubCheckPointTracker) {
            try {
                try {
                    if (!this.slsStreamTable.shardSet.contains(Integer.valueOf(this.shardId))) {
                        long currentTimeMillis = System.currentTimeMillis();
                        if (currentTimeMillis - this.mLastCheckTime > 90000) {
                            try {
                                iLogHubCheckPointTracker.saveCheckPoint(true);
                            } catch (Throwable th) {
                                StreamProcessing.handleException(th);
                            }
                            this.mLastCheckTime = currentTimeMillis;
                        }
                        return null;
                    }
                    final TableBuilder tableBuilder = new TableBuilder(SlsStreamTable.this.columnTypeMap);
                    final HashMap hashMap = new HashMap();
                    Iterator<LogGroupData> it = list.iterator();
                    while (it.hasNext()) {
                        new SlsParser(it.next(), new SlsParser.Callback() { // from class: com.alibaba.tc.sp.input.SlsStreamTable.LogHubProcessor.1
                            private int size = 0;

                            @Override // com.alibaba.tc.sp.input.SlsParser.Callback
                            public void keyValue(byte[] bArr, int i, int i2, int i3, int i4) {
                                if (-1 == i || -1 == i3) {
                                    return;
                                }
                                hashMap.put(new ByteArray(bArr, i, i2), new ByteArray(bArr, i3, i4));
                            }

                            @Override // com.alibaba.tc.sp.input.SlsParser.Callback
                            public void nextLog(int i) {
                                if (hashMap.isEmpty()) {
                                    return;
                                }
                                if (-1 != SlsStreamTable.this.consumeTo && i >= SlsStreamTable.this.consumeTo) {
                                    LogHubProcessor.this.slsStreamTable.removeShard(LogHubProcessor.this.shardId);
                                    return;
                                }
                                long currentTimeMillis2 = System.currentTimeMillis();
                                long j = i * 1000;
                                Delay.DELAY.log("业务延迟" + LogHubProcessor.this.slsStreamTable.sign, j);
                                Delay.DELAY.log("数据间隔" + LogHubProcessor.this.slsStreamTable.sign, currentTimeMillis2);
                                Delay.RESIDENCE_TIME.log("数据滞留" + LogHubProcessor.this.slsStreamTable.sign, currentTimeMillis2 - j);
                                int i2 = 0;
                                for (ByteArray byteArray : SlsStreamTable.this.columns) {
                                    if (byteArray == AbstractStreamTable.__machine_uuid__ || byteArray == AbstractStreamTable.__category__ || byteArray == AbstractStreamTable.__source__ || byteArray == AbstractStreamTable.__topic__) {
                                        i2++;
                                    } else if (byteArray == AbstractStreamTable.__time__) {
                                        int i3 = i2;
                                        i2++;
                                        tableBuilder.append(i3, Long.valueOf(j));
                                    } else {
                                        int i4 = i2;
                                        i2++;
                                        tableBuilder.append(i4, (ByteArray) hashMap.get(byteArray));
                                    }
                                }
                                this.size++;
                                hashMap.clear();
                            }

                            private void addExtraColumn(ByteArray byteArray, ByteArray byteArray2) {
                                if (SlsStreamTable.this.columnNames.contains(byteArray)) {
                                    Column column = tableBuilder.getColumn(byteArray.toString());
                                    for (int i = 0; i < this.size; i++) {
                                        column.add(byteArray2);
                                    }
                                }
                            }

                            @Override // com.alibaba.tc.sp.input.SlsParser.Callback
                            public void end(ByteArray byteArray, ByteArray byteArray2, ByteArray byteArray3, ByteArray byteArray4) {
                                addExtraColumn(AbstractStreamTable.__category__, byteArray);
                                addExtraColumn(AbstractStreamTable.__topic__, byteArray2);
                                addExtraColumn(AbstractStreamTable.__source__, byteArray3);
                                addExtraColumn(AbstractStreamTable.__machine_uuid__, byteArray4);
                            }
                        });
                    }
                    SlsStreamTable.this.queueSizeLogger.logQueueSize("输入队列大小" + this.slsStreamTable.sign, SlsStreamTable.this.arrayBlockingQueueList);
                    SlsStreamTable.this.recordSizeLogger.logRecordSize("输入队列行数" + this.slsStreamTable.sign, SlsStreamTable.this.arrayBlockingQueueList);
                    SlsStreamTable.this.arrayBlockingQueueList.get(this.threadId).put(tableBuilder.build());
                    long currentTimeMillis2 = System.currentTimeMillis();
                    if (currentTimeMillis2 - this.mLastCheckTime > 90000) {
                        try {
                            iLogHubCheckPointTracker.saveCheckPoint(true);
                        } catch (Throwable th2) {
                            StreamProcessing.handleException(th2);
                        }
                        this.mLastCheckTime = currentTimeMillis2;
                    }
                    return null;
                } catch (Throwable th3) {
                    long currentTimeMillis3 = System.currentTimeMillis();
                    if (currentTimeMillis3 - this.mLastCheckTime > 90000) {
                        try {
                            iLogHubCheckPointTracker.saveCheckPoint(true);
                        } catch (Throwable th4) {
                            StreamProcessing.handleException(th4);
                        }
                        this.mLastCheckTime = currentTimeMillis3;
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                StreamProcessing.handleException(th5);
                long currentTimeMillis4 = System.currentTimeMillis();
                if (currentTimeMillis4 - this.mLastCheckTime > 90000) {
                    try {
                        iLogHubCheckPointTracker.saveCheckPoint(true);
                    } catch (Throwable th6) {
                        StreamProcessing.handleException(th6);
                    }
                    this.mLastCheckTime = currentTimeMillis4;
                }
                return null;
            }
        }

        public void shutdown(ILogHubCheckPointTracker iLogHubCheckPointTracker) {
            try {
                this.slsStreamTable.removeShard(this.shardId);
                iLogHubCheckPointTracker.saveCheckPoint(true);
            } catch (Throwable th) {
                StreamProcessing.handleException(th);
            }
        }
    }

    /* loaded from: input_file:com/alibaba/tc/sp/input/SlsStreamTable$LogHubProcessorFactory.class */
    private class LogHubProcessorFactory implements ILogHubProcessorFactory {
        private final int threadId;
        private final SlsStreamTable slsStreamTable;

        LogHubProcessorFactory(int i, SlsStreamTable slsStreamTable) {
            this.threadId = i;
            this.slsStreamTable = slsStreamTable;
        }

        public ILogHubProcessor generatorProcessor() {
            return new LogHubProcessor(this.threadId, this.slsStreamTable);
        }
    }

    public SlsStreamTable(String str, String str2, String str3, String str4, String str5, String str6, Map<String, Type> map) {
        this(1, str, str2, str3, str4, str5, str6, map);
    }

    public SlsStreamTable(int i, String str, String str2, String str3, String str4, String str5, String str6, Map<String, Type> map) {
        this(i, str, str2, str3, str4, str5, str6, (int) (System.currentTimeMillis() / 1000), -1, map);
    }

    public SlsStreamTable(int i, String str, String str2, String str3, String str4, String str5, String str6, String str7, Map<String, Type> map) throws ParseException {
        this(i, str, str2, str3, str4, str5, str6, (int) (DateUtil.parseDate(str7) / 1000), -1, map);
    }

    public SlsStreamTable(int i, String str, String str2, String str3, String str4, String str5, String str6, String str7, String str8, Map<String, Type> map) throws ParseException {
        this(i, str, str2, str3, str4, str5, str6, (int) (DateUtil.parseDate(str7) / 1000), (int) (DateUtil.parseDate(str8) / 1000), map);
    }

    public SlsStreamTable(int i, String str, String str2, String str3, String str4, String str5, String str6, int i2, int i3, Map<String, Type> map) {
        super(i, map);
        this.finishDelayMs = 30000L;
        this.lastUpdateMs = System.currentTimeMillis();
        this.shardSet = new HashSet();
        this.endPoint = (String) Objects.requireNonNull(str);
        this.accessId = (String) Objects.requireNonNull(str2);
        this.accessKey = (String) Objects.requireNonNull(str3);
        this.project = (String) Objects.requireNonNull(str4);
        this.logstore = (String) Objects.requireNonNull(str5);
        this.consumerGroup = (String) Objects.requireNonNull(str6);
        this.consumeFrom = i2;
        this.consumeTo = i3;
        this.sign = "|SlsStreamTable|" + str4 + "|" + str5 + "|" + Integer.toHexString(hashCode());
        this.workers = new ArrayList(i);
    }

    public synchronized void setFinishDelaySeconds(Duration duration) {
        this.finishDelayMs = duration.toMillis();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void addShard(int i) {
        this.shardSet.add(Integer.valueOf(i));
        this.shardSetSize = this.shardSet.size();
        this.lastUpdateMs = System.currentTimeMillis();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void removeShard(int i) {
        this.shardSet.remove(Integer.valueOf(i));
        this.shardSetSize = this.shardSet.size();
        this.lastUpdateMs = System.currentTimeMillis();
    }

    private void updateCheckpoint(Client client, long j) throws LogException, InterruptedException {
        try {
            Iterator it = client.ListShard(new ListShardRequest(this.project, this.logstore)).GetShards().iterator();
            while (it.hasNext()) {
                int GetShardId = ((Shard) it.next()).GetShardId();
                String GetCursor = client.GetCursor(this.project, this.logstore, GetShardId, j).GetCursor();
                client.UpdateCheckPoint(this.project, this.logstore, this.consumerGroup, GetShardId, GetCursor);
                logger.info("update shardId: {}, cursor: {}", Integer.valueOf(GetShardId), GetCursor);
            }
        } catch (LogException e) {
            if (!"shard not exist".equals(e.getMessage())) {
                throw e;
            }
            Thread.sleep(1000L);
            updateCheckpoint(client, j);
        }
    }

    private ConsumerGroup getConsumerGroup(Client client) throws LogException {
        ListConsumerGroupResponse ListConsumerGroup = client.ListConsumerGroup(this.project, this.logstore);
        if (ListConsumerGroup == null) {
            return null;
        }
        Iterator it = ListConsumerGroup.GetConsumerGroups().iterator();
        while (it.hasNext()) {
            ConsumerGroup consumerGroup = (ConsumerGroup) it.next();
            if (consumerGroup.getConsumerGroupName().equalsIgnoreCase(this.consumerGroup)) {
                return consumerGroup;
            }
        }
        return null;
    }

    @Override // com.alibaba.tc.sp.input.StreamTable
    public void start() {
        try {
            int i = (int) ((5000 * 10) / 1000);
            Client client = new Client(this.endPoint, this.accessId, this.accessKey);
            if (null == getConsumerGroup(client)) {
                client.CreateConsumerGroup(this.project, this.logstore, new ConsumerGroup(this.consumerGroup, i, true));
            }
            updateCheckpoint(client, this.consumeFrom);
            for (int i2 = 0; i2 < this.thread; i2++) {
                LogHubConfig logHubConfig = new LogHubConfig(this.consumerGroup, "consumer_" + IpUtil.getIp() + "_" + SystemProperty.mySign() + "_" + i2, this.endPoint, this.project, this.logstore, this.accessId, this.accessKey, this.consumeFrom);
                logHubConfig.setHeartBeatIntervalMillis(5000L);
                logHubConfig.setTimeoutInSeconds(i);
                logHubConfig.setConsumeInOrder(true);
                logHubConfig.setMaxFetchLogGroupSize(1000);
                ClientWorker clientWorker = new ClientWorker(new LogHubProcessorFactory(i2, this), logHubConfig);
                this.workers.add(clientWorker);
                new Thread((Runnable) clientWorker, "sls-consumer-" + i2).start();
            }
        } catch (LogException | InterruptedException | LogHubClientWorkerException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.alibaba.tc.sp.input.AbstractStreamTable, com.alibaba.tc.sp.input.StreamTable
    public boolean isFinished() {
        return -1 != this.consumeTo && this.shardSetSize <= 0 && System.currentTimeMillis() - this.lastUpdateMs >= this.finishDelayMs;
    }

    @Override // com.alibaba.tc.sp.input.StreamTable
    public void stop() {
        try {
            Iterator<ClientWorker> it = this.workers.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            Thread.sleep(30000L);
        } catch (InterruptedException e) {
            logger.info("interrupted");
        }
    }
}
