package com.alibaba.tc.sp.input;

import com.alibaba.tc.SystemProperty;
import com.alibaba.tc.Threads;
import com.alibaba.tc.exception.UnknownTypeException;
import com.alibaba.tc.network.client.RequestEncoder;
import com.alibaba.tc.offheap.ByteArray;
import com.alibaba.tc.sp.input.kafka.MyKafkaConsumer;
import com.alibaba.tc.table.TableBuilder;
import com.alibaba.tc.table.Type;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/tc/sp/input/KafkaStreamTable.class */
public class KafkaStreamTable extends AbstractStreamTable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaStreamTable.class);
    private final Properties properties;
    private final String topic;
    private final String sign;
    private final long consumeFrom;
    private final long consumeTo;
    private final int myHash;
    private final int serverCount;
    private final Set<Integer> myPartitions;
    private final ScheduledExecutorService partitionsDetector;
    private final List<Thread> consumers;
    private final int timeColumnIndex;
    private final List<String> stringColumns;
    private final List<Type> types;
    private long finishDelayMs;
    private long lastUpdateMs;
    private final Set<Integer> partitionSet;
    private int partitionSetSize;

    /* renamed from: com.alibaba.tc.sp.input.KafkaStreamTable$3, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/tc/sp/input/KafkaStreamTable$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$tc$table$Type = new int[Type.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$tc$table$Type[Type.DOUBLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$tc$table$Type[Type.BIGINT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$tc$table$Type[Type.INT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$alibaba$tc$table$Type[Type.VARCHAR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public KafkaStreamTable(String str, String str2, String str3, long j, Map<String, Type> map) {
        this(str, str2, str3, j, -1L, map);
    }

    public KafkaStreamTable(String str, String str2, String str3, long j, long j2, Map<String, Type> map) {
        super(0, map);
        this.myPartitions = new HashSet();
        this.consumers = new ArrayList();
        this.finishDelayMs = 30000L;
        this.lastUpdateMs = System.currentTimeMillis();
        this.partitionSet = new HashSet();
        this.topic = (String) Objects.requireNonNull(str3);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", Objects.requireNonNull(str));
        properties.put("group.id", Objects.requireNonNull(str2));
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("max.poll.records", 40000);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.offset.reset", "none");
        this.properties = properties;
        this.sign = "|KafkaStreamTable|" + str3 + "|" + Integer.toHexString(hashCode());
        this.partitionsDetector = Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed("partitions_detector" + this.sign));
        this.consumeFrom = j;
        this.consumeTo = j2;
        this.myHash = SystemProperty.getMyHash();
        this.serverCount = SystemProperty.getServerCount();
        this.stringColumns = new ArrayList(this.columns.size());
        this.types = new ArrayList(this.columns.size());
        this.timeColumnIndex = this.columns.indexOf(__time__);
        Iterator<ByteArray> it = this.columns.iterator();
        while (it.hasNext()) {
            String byteArray = it.next().toString();
            this.stringColumns.add(byteArray);
            this.types.add(map.get(byteArray));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void newConsumer(final TopicPartition topicPartition, final OffsetAndTimestamp offsetAndTimestamp) {
        if (topicPartition.partition() % this.serverCount == this.myHash && !this.myPartitions.contains(Integer.valueOf(topicPartition.partition()))) {
            this.myPartitions.add(Integer.valueOf(topicPartition.partition()));
            addPartition(topicPartition.partition());
            final int size = this.arrayBlockingQueueList.size();
            this.arrayBlockingQueueList.add(new ArrayBlockingQueue<>(100));
            Thread thread = new Thread(new Runnable() { // from class: com.alibaba.tc.sp.input.KafkaStreamTable.1
                @Override // java.lang.Runnable
                public void run() {
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(KafkaStreamTable.this.properties);
                    kafkaConsumer.assign(Arrays.asList(topicPartition));
                    if (null == offsetAndTimestamp) {
                        kafkaConsumer.seekToBeginning(Arrays.asList(topicPartition));
                    } else {
                        kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
                    }
                    Gson gson = new Gson();
                    while (!Thread.interrupted()) {
                        try {
                            ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(KafkaStreamTable.this.sleepMs));
                            if (!poll.isEmpty()) {
                                TableBuilder tableBuilder = new TableBuilder(KafkaStreamTable.this.columnTypeMap);
                                Iterator it = poll.iterator();
                                while (it.hasNext()) {
                                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                    Integer num = (Integer) consumerRecord.key();
                                    if (-1 != KafkaStreamTable.this.consumeTo && num.intValue() >= KafkaStreamTable.this.consumeTo) {
                                        this.removePartition(topicPartition.partition());
                                        return;
                                    }
                                    JsonObject jsonObject = (JsonObject) gson.fromJson((String) consumerRecord.value(), JsonObject.class);
                                    for (int i = 0; i < KafkaStreamTable.this.stringColumns.size(); i++) {
                                        if (i == KafkaStreamTable.this.timeColumnIndex) {
                                            tableBuilder.append(i, Long.valueOf(num.intValue() * 1000));
                                        } else {
                                            JsonElement jsonElement = jsonObject.get((String) KafkaStreamTable.this.stringColumns.get(i));
                                            if (null == jsonElement || jsonElement.isJsonNull()) {
                                                tableBuilder.appendValue(i, null);
                                            } else {
                                                Type type = (Type) KafkaStreamTable.this.types.get(i);
                                                switch (AnonymousClass3.$SwitchMap$com$alibaba$tc$table$Type[type.ordinal()]) {
                                                    case 1:
                                                        tableBuilder.append(i, Double.valueOf(jsonElement.getAsDouble()));
                                                        break;
                                                    case 2:
                                                        tableBuilder.append(i, Long.valueOf(jsonElement.getAsLong()));
                                                        break;
                                                    case 3:
                                                        tableBuilder.append(i, Integer.valueOf(jsonElement.getAsInt()));
                                                        break;
                                                    case RequestEncoder.LENGTH_FIELD_LENGTH /* 4 */:
                                                        tableBuilder.append(i, jsonElement.getAsString());
                                                        break;
                                                    default:
                                                        throw new UnknownTypeException(type.name());
                                                }
                                            }
                                        }
                                    }
                                }
                                KafkaStreamTable.this.queueSizeLogger.logQueueSize("input queue size" + this.sign, KafkaStreamTable.this.arrayBlockingQueueList);
                                KafkaStreamTable.this.recordSizeLogger.logRecordSize("input queue rows" + this.sign, KafkaStreamTable.this.arrayBlockingQueueList);
                                KafkaStreamTable.this.arrayBlockingQueueList.get(size).put(tableBuilder.build());
                            }
                        } catch (InterruptedException e) {
                            return;
                        } catch (InterruptException e2) {
                            return;
                        }
                    }
                }
            });
            thread.start();
            this.consumers.add(thread);
        }
    }

    private synchronized void addPartition(int i) {
        this.partitionSet.add(Integer.valueOf(i));
        this.partitionSetSize = this.partitionSet.size();
        this.lastUpdateMs = System.currentTimeMillis();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void removePartition(int i) {
        this.partitionSet.remove(Integer.valueOf(i));
        this.partitionSetSize = this.partitionSet.size();
        this.lastUpdateMs = System.currentTimeMillis();
    }

    @Override // com.alibaba.tc.sp.input.AbstractStreamTable, com.alibaba.tc.sp.input.StreamTable
    public boolean isFinished() {
        return -1 != this.consumeTo && this.partitionSetSize <= 0 && System.currentTimeMillis() - this.lastUpdateMs >= this.finishDelayMs;
    }

    @Override // com.alibaba.tc.sp.input.StreamTable
    public void start() {
        KafkaConsumer newKafkaConsumer = MyKafkaConsumer.newKafkaConsumer(this.properties);
        List<PartitionInfo> partitionsFor = newKafkaConsumer.partitionsFor(this.topic);
        HashMap hashMap = new HashMap();
        for (PartitionInfo partitionInfo : partitionsFor) {
            hashMap.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), Long.valueOf(this.consumeFrom));
        }
        Map offsetsForTimes = newKafkaConsumer.offsetsForTimes(hashMap);
        for (TopicPartition topicPartition : offsetsForTimes.keySet()) {
            newConsumer(topicPartition, (OffsetAndTimestamp) offsetsForTimes.get(topicPartition));
        }
        this.partitionsDetector.scheduleWithFixedDelay(new Runnable() { // from class: com.alibaba.tc.sp.input.KafkaStreamTable.2
            @Override // java.lang.Runnable
            public void run() {
                KafkaStreamTable.logger.info("{} partitions: {}", KafkaStreamTable.this.sign, KafkaStreamTable.this.myPartitions);
                Iterator it = MyKafkaConsumer.newKafkaConsumer(KafkaStreamTable.this.properties).partitionsFor(KafkaStreamTable.this.topic).iterator();
                while (it.hasNext()) {
                    KafkaStreamTable.this.newConsumer(new TopicPartition(KafkaStreamTable.this.topic, ((PartitionInfo) it.next()).partition()), null);
                }
            }
        }, 0L, 5L, TimeUnit.SECONDS);
    }

    @Override // com.alibaba.tc.sp.input.StreamTable
    public void stop() {
        this.partitionsDetector.shutdownNow();
        Iterator<Thread> it = this.consumers.iterator();
        while (it.hasNext()) {
            it.next().interrupt();
        }
        this.consumers.clear();
        this.myPartitions.clear();
        this.partitionSet.clear();
        this.partitionSetSize = 0;
    }
}
