package com.alibaba.tc.window;

import com.alibaba.tc.Threads;
import com.alibaba.tc.exception.OutOfOrderException;
import com.alibaba.tc.function.AggTimeWindowFunction;
import com.alibaba.tc.function.TimeWindowFunction;
import com.alibaba.tc.table.Row;
import com.alibaba.tc.table.RowByTable;
import com.alibaba.tc.table.SlideTable;
import com.alibaba.tc.table.Table;
import com.alibaba.tc.table.TableBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.http.annotation.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:com/alibaba/tc/window/SlideWindow.class */
public class SlideWindow extends TimeWindow {
    private static final Logger logger = LoggerFactory.getLogger(SlideWindow.class);
    private static List<SlideWindow> slideWindows = new ArrayList();
    private final long windowSizeDurationMs;
    private final long slideDurationMs;
    private final String[] partitionByColumnNames;
    private final TimeWindowFunction windowFunction;
    private final AggTimeWindowFunction aggTimeWindowFunction;
    private final String[] columnNames;
    private final Map<Thread, Map<List<Comparable>, SlideTable>> threadPartitionedTables;
    private final Map<Thread, WindowTime> windowTimeMap;
    private final String sign;
    private int timesExceed;
    private int timesBehind;
    private long maxGapExceed;
    private long maxGapBehind;
    private OutOfOrderException outOfOrderException;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/tc/window/SlideWindow$WindowTime.class */
    public static class WindowTime {
        long startTime;
        long lastDataTime;
        long lastDataSystemTime;

        private WindowTime() {
        }
    }

    private synchronized void warn(long j, long j2, OutOfOrderException outOfOrderException) {
        if (j < j2) {
            if (j - j2 < this.maxGapBehind) {
                this.maxGapBehind = j - j2;
            }
            this.timesBehind++;
        } else {
            if ((j - j2) - this.windowSizeDurationMs > this.maxGapExceed) {
                this.maxGapExceed = (j - j2) - this.windowSizeDurationMs;
            }
            this.timesExceed++;
        }
        this.outOfOrderException = outOfOrderException;
    }

    public SlideWindow(Duration duration, Duration duration2, String[] strArr, String str, AggTimeWindowFunction aggTimeWindowFunction, String... strArr2) {
        this(duration, duration2, strArr, str, null, aggTimeWindowFunction, StoreType.STORE_BY_COLUMN, strArr2);
    }

    public SlideWindow(Duration duration, Duration duration2, String[] strArr, String str, TimeWindowFunction timeWindowFunction, String... strArr2) {
        this(duration, duration2, strArr, str, timeWindowFunction, null, StoreType.STORE_BY_COLUMN, strArr2);
    }

    public SlideWindow(Duration duration, Duration duration2, String[] strArr, String str, AggTimeWindowFunction aggTimeWindowFunction, StoreType storeType, String... strArr2) {
        this(duration, duration2, strArr, str, null, aggTimeWindowFunction, storeType, strArr2);
    }

    public SlideWindow(Duration duration, Duration duration2, String[] strArr, String str, TimeWindowFunction timeWindowFunction, StoreType storeType, String... strArr2) {
        this(duration, duration2, strArr, str, timeWindowFunction, null, storeType, strArr2);
    }

    private SlideWindow(Duration duration, Duration duration2, String[] strArr, String str, TimeWindowFunction timeWindowFunction, AggTimeWindowFunction aggTimeWindowFunction, StoreType storeType, String... strArr2) {
        super(storeType, str);
        this.threadPartitionedTables = new ConcurrentHashMap();
        this.windowTimeMap = new ConcurrentHashMap();
        this.maxGapExceed = 0L;
        this.maxGapBehind = 0L;
        this.windowSizeDurationMs = ((Duration) Objects.requireNonNull(duration)).toMillis();
        this.slideDurationMs = ((Duration) Objects.requireNonNull(duration2)).toMillis();
        if (this.windowSizeDurationMs <= 0) {
            throw new IllegalArgumentException("windowSizeDuration should be greater than 0ms");
        }
        if (this.slideDurationMs <= 0) {
            throw new IllegalArgumentException("hopDuration should be greater than 0ms");
        }
        if (this.slideDurationMs > this.windowSizeDurationMs) {
            throw new IllegalArgumentException("hopDuration should be less or equal to windowSizeDuration");
        }
        this.partitionByColumnNames = (String[]) Objects.requireNonNull(strArr);
        if (strArr.length < 1) {
            throw new IllegalArgumentException("at least one partition by column");
        }
        this.windowFunction = timeWindowFunction;
        this.aggTimeWindowFunction = aggTimeWindowFunction;
        this.columnNames = (String[]) Objects.requireNonNull(strArr2);
        if (this.columnNames.length < 1) {
            throw new IllegalArgumentException("at least one returned column");
        }
        this.sign = "HopWindow|" + String.join(",", strArr) + "|" + str + "|" + duration + "|" + duration2;
        slideWindows.add(this);
    }

    private void enterWindow(Table table, int i, Map<List<Comparable>, SlideTable> map) {
        getPartitionedSlideTable(genPartitionKey(table, i, this.partitionByColumnNames), table, map, this.timeColumnName, this.storeType).addRow(table, i);
    }

    private void triggerAllWindow(TableBuilder tableBuilder, WindowTime windowTime, Map<List<Comparable>, SlideTable> map) {
        ArrayList arrayList = new ArrayList();
        for (List<Comparable> list : map.keySet()) {
            SlideTable slideTable = map.get(list);
            appendRow(tableBuilder, list, slideTable.rows(), windowTime.startTime, windowTime.startTime + this.windowSizeDurationMs);
            slideTable.removeLessThan(windowTime.startTime + this.slideDurationMs);
            int size = slideTable.size();
            if (0 == size) {
                arrayList.add(list);
            } else if (size < 0) {
                throw new IllegalStateException(String.format("partitionedTable.size()：%d", Integer.valueOf(slideTable.size())));
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            map.remove((List) it.next());
        }
    }

    private void appendRow(TableBuilder tableBuilder, List<Comparable> list, List<Row> list2, long j, long j2) {
        if (this.windowFunction != null) {
            appendRows(tableBuilder, this.windowFunction.transform(list, list2, j, j2));
        } else {
            appendRow(tableBuilder, this.aggTimeWindowFunction.agg(list, list2, j, j2));
        }
    }

    private void triggerOneElemWindow(TableBuilder tableBuilder, Table table, int i, long j) {
        List<Comparable> genPartitionKey = genPartitionKey(table, i, this.partitionByColumnNames);
        long j2 = (j / this.windowSizeDurationMs) * this.windowSizeDurationMs;
        RowByTable rowByTable = new RowByTable(table, i);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(rowByTable);
        appendRow(tableBuilder, genPartitionKey, arrayList, j2, j2 + this.windowSizeDurationMs);
    }

    public Table slide(List<Table> list) {
        checkTablesSize(list);
        TableBuilder newTableBuilder = newTableBuilder(this.columnNames);
        List<Table> watermark = watermark(list);
        Thread currentThread = Thread.currentThread();
        Map<List<Comparable>, SlideTable> map = this.threadPartitionedTables.get(currentThread);
        if (null == map) {
            map = new HashMap();
            this.threadPartitionedTables.put(currentThread, map);
        }
        WindowTime windowTime = this.windowTimeMap.get(currentThread);
        boolean z = true;
        for (Table table : watermark) {
            if (table.size() > 0) {
                z = false;
                windowTime = hopOneTable(newTableBuilder, table, windowTime, currentThread, map);
            }
        }
        if (z && windowTime != null) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - windowTime.lastDataSystemTime > this.noDataDelay) {
                long j = (currentTimeMillis - windowTime.lastDataSystemTime) + windowTime.lastDataTime;
                if (j >= windowTime.startTime + this.windowSizeDurationMs) {
                    triggerAllWindow(newTableBuilder, windowTime, map);
                    windowTime.startTime = (j / this.windowSizeDurationMs) * this.windowSizeDurationMs;
                    logger.info("no data window advanced, now: {}, lastDataSystemTime: {}, partitionByColumnNames: {}, timeColumnName: {}", new Object[]{Long.valueOf(currentTimeMillis), Long.valueOf(windowTime.lastDataSystemTime), this.partitionByColumnNames, this.timeColumnName});
                    return newTableBuilder.build();
                }
            }
            return newTableBuilder.build();
        }
        return newTableBuilder.build();
    }

    private WindowTime hopOneTable(TableBuilder tableBuilder, Table table, WindowTime windowTime, Thread thread, Map<List<Comparable>, SlideTable> map) {
        long longValue = ((Long) table.getColumn(this.timeColumnName).get(0)).longValue();
        if (null == windowTime) {
            windowTime = new WindowTime();
            this.windowTimeMap.put(thread, windowTime);
            windowTime.startTime = (longValue / this.windowSizeDurationMs) * this.windowSizeDurationMs;
        }
        long currentTimeMillis = System.currentTimeMillis();
        windowTime.lastDataTime = longValue;
        windowTime.lastDataSystemTime = currentTimeMillis;
        for (int i = 0; i < table.size(); i++) {
            try {
                longValue = ((Long) table.getColumn(this.timeColumnName).get(i)).longValue();
                if (longValue >= windowTime.startTime + this.windowSizeDurationMs) {
                    triggerAllWindow(tableBuilder, windowTime, map);
                    windowTime.startTime += this.slideDurationMs;
                    if (longValue >= windowTime.startTime + this.windowSizeDurationMs) {
                        triggerOneElemWindow(tableBuilder, table, i, longValue);
                        warn(longValue, windowTime.startTime, null);
                    } else {
                        enterWindow(table, i, map);
                    }
                } else if (longValue < windowTime.startTime) {
                    triggerOneElemWindow(tableBuilder, table, i, longValue);
                    warn(longValue, windowTime.startTime, null);
                } else {
                    enterWindow(table, i, map);
                }
            } catch (OutOfOrderException e) {
                warn(longValue, windowTime.startTime, e);
            }
        }
        return windowTime;
    }

    @Override // com.alibaba.tc.window.TimeWindow, com.alibaba.tc.window.Window
    public List<Row> getRows(List<Comparable> list) {
        SlideTable slideTable = this.threadPartitionedTables.get(Thread.currentThread()).get(list);
        if (null == slideTable) {
            return null;
        }
        return slideTable.rows();
    }

    static {
        new ScheduledThreadPoolExecutor(1, Threads.threadsNamed("HopWindowLogger")).scheduleWithFixedDelay(new Runnable() { // from class: com.alibaba.tc.window.SlideWindow.1
            @Override // java.lang.Runnable
            public void run() {
                for (SlideWindow slideWindow : SlideWindow.slideWindows) {
                    synchronized (slideWindow) {
                        if (slideWindow.timesExceed > 0) {
                            SlideWindow.logger.warn("{}: dataTime exceed window start, times: {}, maxGap: {}, use watermark to avoid(windowSize too small or noDataDelay too small also lead to this case). {}", new Object[]{slideWindow.sign, Integer.valueOf(slideWindow.timesExceed), Long.valueOf(slideWindow.maxGapExceed), slideWindow.outOfOrderException});
                            slideWindow.timesExceed = 0;
                        }
                        if (slideWindow.timesBehind > 0) {
                            SlideWindow.logger.warn("{}: dataTime behind window start, times: {}, maxGap: {}, use watermark to avoid(windowSize too small or noDataDelay too small also lead to this case). {}", new Object[]{slideWindow.sign, Integer.valueOf(slideWindow.timesBehind), Long.valueOf(slideWindow.maxGapBehind), slideWindow.outOfOrderException});
                            slideWindow.timesBehind = 0;
                        }
                        slideWindow.outOfOrderException = null;
                    }
                }
            }
        }, 5L, 5L, TimeUnit.SECONDS);
    }
}
