package com.alibaba.tc.sp.output;

import com.alibaba.tc.Threads;
import com.alibaba.tc.offheap.ByteArray;
import com.alibaba.tc.sp.QueueSizeLogger;
import com.alibaba.tc.sp.StreamProcessing;
import com.alibaba.tc.table.Column;
import com.alibaba.tc.table.Table;
import com.alibaba.tc.util.IpUtil;
import com.alibaba.tc.util.ScalarUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/tc/sp/output/KafkaOutputTable.class */
public class KafkaOutputTable extends AbstractOutputTable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaOutputTable.class);
    private final String topic;
    private final Properties properties;
    private final int batchSize;
    private volatile int[] allPartitions;
    private final String sign;
    private final ScheduledExecutorService partitionsDetector;
    private final ThreadPoolExecutor threadPoolExecutor;
    private final QueueSizeLogger queueSizeLogger;
    protected final QueueSizeLogger recordSizeLogger;
    private final Random random;

    public KafkaOutputTable(String str, String str2) {
        this(Runtime.getRuntime().availableProcessors(), 40000, str, str2);
    }

    public KafkaOutputTable(int i, String str, String str2) {
        this(i, 40000, str, str2);
    }

    public KafkaOutputTable(int i, int i2, String str, String str2) {
        super(i);
        this.queueSizeLogger = new QueueSizeLogger();
        this.recordSizeLogger = new QueueSizeLogger();
        this.random = new Random();
        this.topic = (String) Objects.requireNonNull(str2);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", Objects.requireNonNull(str));
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.properties = properties;
        this.batchSize = i2;
        this.sign = "|KafkaOutputTable|" + str2 + "|" + Integer.toHexString(hashCode());
        this.partitionsDetector = Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed("partitions_detector" + this.sign));
        this.threadPoolExecutor = new ThreadPoolExecutor(i, i, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(1), new ThreadFactoryBuilder().setNameFormat("kafka-output-%d").build());
    }

    @Override // com.alibaba.tc.sp.output.OutputTable
    public void produce(Table table) throws InterruptedException {
        this.queueSizeLogger.logQueueSize("Kafka输出队列大小" + this.sign, this.arrayBlockingQueueList);
        this.recordSizeLogger.logRecordSize("Kafka输出队列行数" + this.sign, this.arrayBlockingQueueList);
        putTable(table);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void detectPartitions() {
        List partitionsFor = new KafkaProducer(this.properties).partitionsFor(this.topic);
        int[] iArr = new int[partitionsFor.size()];
        int i = 0;
        Iterator it = partitionsFor.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            iArr[i2] = ((PartitionInfo) it.next()).partition();
        }
        this.allPartitions = iArr;
    }

    public void start() {
        detectPartitions();
        this.partitionsDetector.scheduleWithFixedDelay(new Runnable() { // from class: com.alibaba.tc.sp.output.KafkaOutputTable.1
            @Override // java.lang.Runnable
            public void run() {
                KafkaOutputTable.logger.info("{} partitions: {}", KafkaOutputTable.this.sign, KafkaOutputTable.this.allPartitions);
                KafkaOutputTable.this.detectPartitions();
            }
        }, 5L, 5L, TimeUnit.SECONDS);
        for (int i = 0; i < this.thread; i++) {
            final int i2 = i;
            this.threadPoolExecutor.submit(new Runnable() { // from class: com.alibaba.tc.sp.output.KafkaOutputTable.2
                @Override // java.lang.Runnable
                public void run() {
                    Properties properties = (Properties) KafkaOutputTable.this.properties.clone();
                    properties.put("client.id", IpUtil.getIp() + "-" + i2);
                    KafkaProducer kafkaProducer = new KafkaProducer(properties);
                    while (!Thread.interrupted()) {
                        try {
                            Table consume = KafkaOutputTable.this.consume();
                            List<Column> columns = consume.getColumns();
                            int currentTimeMillis = (int) (System.currentTimeMillis() / 1000);
                            for (int i3 = 0; i3 < consume.size(); i3++) {
                                JsonObject jsonObject = new JsonObject();
                                for (int i4 = 0; i4 < columns.size(); i4++) {
                                    if (null != columns.get(i4).get(i3)) {
                                        String name = columns.get(i4).name();
                                        Object obj = columns.get(i4).get(i3);
                                        if (null == obj) {
                                            jsonObject.add(name, (JsonElement) null);
                                        } else if ((obj instanceof String) || (obj instanceof ByteArray)) {
                                            jsonObject.addProperty(name, ScalarUtil.toStr(obj));
                                        } else if ("__time__".equals(name)) {
                                            currentTimeMillis = (int) (((Long) obj).longValue() / 1000);
                                        } else {
                                            jsonObject.addProperty(name, (Number) obj);
                                        }
                                    }
                                }
                                kafkaProducer.send(new ProducerRecord(KafkaOutputTable.this.topic, Integer.valueOf(KafkaOutputTable.this.allPartitions[KafkaOutputTable.this.random.nextInt(KafkaOutputTable.this.allPartitions.length)]), Integer.valueOf(currentTimeMillis), jsonObject.toString()));
                                if (i3 > 0 && i3 % KafkaOutputTable.this.batchSize == 0) {
                                    kafkaProducer.flush();
                                    currentTimeMillis = (int) (System.currentTimeMillis() / 1000);
                                }
                            }
                            kafkaProducer.flush();
                        } catch (InterruptedException e) {
                            KafkaOutputTable.logger.info("interrupted");
                            return;
                        } catch (Throwable th) {
                            StreamProcessing.handleException(th);
                            return;
                        }
                    }
                }
            });
        }
    }

    @Override // com.alibaba.tc.sp.output.OutputTable
    public void stop() {
        this.threadPoolExecutor.shutdownNow();
    }
}
