package com.alibaba.tc.sp.output;

import com.alibaba.tc.sp.QueueSizeLogger;
import com.alibaba.tc.sp.StreamProcessing;
import com.alibaba.tc.table.Column;
import com.alibaba.tc.table.Table;
import com.alibaba.tc.util.ScalarUtil;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogItem;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/tc/sp/output/SlsOutputTable.class */
public class SlsOutputTable extends AbstractOutputTable {
    private static final Logger logger = LoggerFactory.getLogger(SlsOutputTable.class);
    private final String endPoint;
    private final String accessId;
    private final String accessKey;
    private final String project;
    private final String logstore;
    private final int batchSize;
    private final String sign;
    private final ThreadPoolExecutor threadPoolExecutor;
    private final QueueSizeLogger queueSizeLogger;
    protected final QueueSizeLogger recordSizeLogger;

    public SlsOutputTable(String str, String str2, String str3, String str4, String str5) {
        this(Runtime.getRuntime().availableProcessors(), 40000, str, str2, str3, str4, str5);
    }

    public SlsOutputTable(int i, int i2, String str, String str2, String str3, String str4, String str5) {
        super(i);
        this.queueSizeLogger = new QueueSizeLogger();
        this.recordSizeLogger = new QueueSizeLogger();
        this.endPoint = (String) Objects.requireNonNull(str);
        this.accessId = (String) Objects.requireNonNull(str2);
        this.accessKey = (String) Objects.requireNonNull(str3);
        this.project = (String) Objects.requireNonNull(str4);
        this.logstore = (String) Objects.requireNonNull(str5);
        this.batchSize = i2;
        this.sign = "|SlsOutputTable|" + str4 + "|" + str5 + "|" + Integer.toHexString(hashCode());
        this.threadPoolExecutor = new ThreadPoolExecutor(i, i, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(1), new ThreadFactoryBuilder().setNameFormat("sls-output-%d").build());
    }

    @Override // com.alibaba.tc.sp.output.OutputTable
    public void produce(Table table) throws InterruptedException {
        this.queueSizeLogger.logQueueSize("Sls输出队列大小" + this.sign, this.arrayBlockingQueueList);
        this.recordSizeLogger.logRecordSize("Sls输出队列行数" + this.sign, this.arrayBlockingQueueList);
        putTable(table);
    }

    public void start() {
        for (int i = 0; i < this.thread; i++) {
            this.threadPoolExecutor.submit(new Runnable() { // from class: com.alibaba.tc.sp.output.SlsOutputTable.1
                @Override // java.lang.Runnable
                public void run() {
                    Client client = new Client(SlsOutputTable.this.endPoint, SlsOutputTable.this.accessId, SlsOutputTable.this.accessKey);
                    while (!Thread.interrupted()) {
                        try {
                            Table consume = SlsOutputTable.this.consume();
                            List<Column> columns = consume.getColumns();
                            ArrayList arrayList = new ArrayList();
                            for (int i2 = 0; i2 < consume.size(); i2++) {
                                LogItem logItem = new LogItem();
                                for (int i3 = 0; i3 < columns.size(); i3++) {
                                    if (null != columns.get(i3).get(i2)) {
                                        String name = columns.get(i3).name();
                                        Comparable comparable = columns.get(i3).get(i2);
                                        if ("__time__".equals(name)) {
                                            logItem.SetTime((int) (((Long) comparable).longValue() / 1000));
                                        } else {
                                            logItem.PushBack(name, ScalarUtil.toStr(comparable));
                                        }
                                    }
                                }
                                arrayList.add(logItem);
                                if (arrayList.size() == SlsOutputTable.this.batchSize) {
                                    client.PutLogs(SlsOutputTable.this.project, SlsOutputTable.this.logstore, "", arrayList, "");
                                    arrayList.clear();
                                }
                            }
                            if (arrayList.size() > 0) {
                                client.PutLogs(SlsOutputTable.this.project, SlsOutputTable.this.logstore, "", arrayList, "");
                            }
                        } catch (InterruptedException e) {
                            SlsOutputTable.logger.info("interrupted");
                            return;
                        } catch (Throwable th) {
                            StreamProcessing.handleException(th);
                            return;
                        }
                    }
                }
            });
        }
    }

    @Override // com.alibaba.tc.sp.output.OutputTable
    public void stop() {
        this.threadPoolExecutor.shutdownNow();
    }
}
