package com.srotya.minuteman.wal;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Logger;
import org.apache.curator.utils.ZKPaths;

/* loaded from: input_file:com/srotya/minuteman/wal/MappedWAL.class */
public class MappedWAL implements WAL {
    private static final Logger logger = Logger.getLogger(MappedWAL.class.getName());
    private MappedByteBuffer currentWrite;
    private int segmentSize;
    private volatile int segmentCounter;
    private RandomAccessFile raf;
    private String walDirectory;
    private Map<String, MappedWALFollower> followerMap;
    private volatile int counter;
    private int maxCounter;
    private int isrThreshold;
    private AtomicBoolean walDeletion;
    private RandomAccessFile metaraf;
    private MappedByteBuffer metaBuf;
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock read = this.lock.readLock();
    private ReentrantReadWriteLock.WriteLock write = this.lock.writeLock();
    private volatile long commitOffset = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/srotya/minuteman/wal/MappedWAL$MappedWALFollower.class */
    public class MappedWALFollower {
        private volatile long offset;
        private RandomAccessFile reader;
        private MappedByteBuffer buf;
        private volatile boolean isr;

        private MappedWALFollower() {
        }

        public RandomAccessFile getReader() {
            return this.reader;
        }

        public void setReader(RandomAccessFile randomAccessFile) {
            this.reader = randomAccessFile;
        }

        public MappedByteBuffer getBuf() {
            return this.buf;
        }

        public void setBuf(MappedByteBuffer mappedByteBuffer) {
            this.buf = mappedByteBuffer;
        }

        public long getOffset() {
            return this.offset;
        }

        public void setOffset(long j) {
            this.offset = j;
        }

        public boolean isIsr() {
            return this.isr;
        }

        public void setIsr(boolean z) {
            this.isr = z;
        }
    }

    @Override // com.srotya.minuteman.wal.WAL
    public void configure(Map<String, String> map, ScheduledExecutorService scheduledExecutorService) throws IOException {
        this.walDirectory = map.getOrDefault(WAL.WAL_DIR, WAL.DEFAULT_WAL_DIR);
        logger.info("Configuring WAL directory:" + this.walDirectory);
        new File(this.walDirectory).mkdirs();
        this.segmentSize = Integer.parseInt(map.getOrDefault(WAL.WAL_SEGMENT_SIZE, String.valueOf(WAL.DEFAULT_WALL_SIZE)));
        logger.info("Configuring segment size:" + this.segmentSize);
        this.maxCounter = Integer.parseInt(map.getOrDefault(WAL.WAL_SEGMENT_FLUSH_COUNT, String.valueOf(-1)));
        logger.info("Configuring max flush counter to:" + this.maxCounter);
        this.isrThreshold = Integer.parseInt(map.getOrDefault(WAL.WAL_ISR_THRESHOLD, String.valueOf(8388608)));
        this.followerMap = new ConcurrentHashMap();
        this.walDeletion = new AtomicBoolean(Boolean.parseBoolean(map.getOrDefault(WAL.WAL_DELETION_DISABLED, "true")));
        logger.info("WAL deletion is set to:" + this.walDeletion.get());
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            updateISR();
        }, Integer.parseInt(map.getOrDefault(WAL.WAL_ISRCHECK_DELAY, "1000")), Integer.parseInt(map.getOrDefault(WAL.WAL_ISRCHECK_FREQUENCY, "10000")), TimeUnit.MILLISECONDS);
        initAndRecoverMetaRaf();
        checkAndRotateSegment(0);
    }

    public void updateISR() {
        long position = this.currentWrite.position() + ((this.segmentCounter - 1) * this.segmentSize);
        for (Map.Entry<String, MappedWALFollower> entry : this.followerMap.entrySet()) {
            MappedWALFollower value = entry.getValue();
            String key = entry.getKey();
            if (position - value.getOffset() > this.isrThreshold) {
                value.setIsr(false);
                logger.fine("Follower no longer an ISR:" + key + " wal:" + this.walDirectory + " current offset:" + position + " follower:" + value.getOffset() + " thresholdDiff:" + (position - value.getOffset()) + " isrThreshold:" + this.isrThreshold);
            } else {
                value.setIsr(true);
                logger.fine("Follower now an ISR:" + key + " wal:" + this.walDirectory + " with offset(" + getCommitOffset() + ")");
            }
        }
        updateMinOffset();
    }

    private void initAndRecoverMetaRaf() throws IOException {
        this.metaraf = new RandomAccessFile(this.walDirectory + "/.md", "rwd");
        boolean z = false;
        if (this.metaraf.length() > 0) {
            z = true;
        }
        this.metaBuf = this.metaraf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0L, 1024L);
        if (!z) {
            this.metaBuf.putLong(0L);
        } else {
            logger.info("Found md file with commit offset");
            this.commitOffset = this.metaBuf.getLong();
        }
    }

    @Override // com.srotya.minuteman.wal.WAL
    public void flush() throws IOException {
        this.write.lock();
        if (this.currentWrite != null) {
            logger.finer("Flushing buffer to disk");
            this.currentWrite.force();
        }
        this.write.unlock();
    }

    @Override // com.srotya.minuteman.wal.WAL
    public void close() throws IOException {
        this.write.lock();
        flush();
        if (this.raf != null) {
            this.raf.close();
        }
        for (Map.Entry<String, MappedWALFollower> entry : this.followerMap.entrySet()) {
            if (entry.getValue().getReader() != null) {
                entry.getValue().getReader().close();
            }
        }
        this.write.unlock();
    }

    @Override // com.srotya.minuteman.wal.WAL
    public int getOffset() throws IOException {
        if (this.currentWrite != null) {
            return this.currentWrite.position();
        }
        return -1;
    }

    @Override // com.srotya.minuteman.wal.WAL
    public void write(byte[] bArr, boolean z) throws IOException {
        this.write.lock();
        checkAndRotateSegment(bArr.length + 4);
        try {
            this.counter++;
            this.currentWrite.putInt(bArr.length);
            this.currentWrite.put(bArr);
            this.currentWrite.putInt(0, this.currentWrite.position());
            if (this.maxCounter != -1 && (z || this.counter == this.maxCounter)) {
                flush();
                this.counter = 0;
            }
            logger.finest("Wrote data:" + bArr.length + " at new offset:" + this.currentWrite.position() + " new file:" + this.segmentCounter);
        } finally {
            this.write.unlock();
        }
    }

    @Override // com.srotya.minuteman.wal.WAL
    public WALRead read(String str, long j, int i, boolean z) throws IOException {
        WALRead initializeBuffer;
        this.read.lock();
        MappedWALFollower mappedWALFollower = this.followerMap.get(str);
        if (mappedWALFollower == null) {
            mappedWALFollower = new MappedWALFollower();
            this.followerMap.put(str, mappedWALFollower);
            logger.info("No follower entry, creating one for:" + str);
        }
        this.read.unlock();
        WALRead wALRead = new WALRead();
        int rawOffsetToSegmentId = rawOffsetToSegmentId(j);
        checkAndCleanupSegment(str, mappedWALFollower, rawOffsetToSegmentId, rawOffsetToSegmentId(mappedWALFollower.getOffset()));
        if (mappedWALFollower.getBuf() == null && (initializeBuffer = initializeBuffer(mappedWALFollower, rawOffsetToSegmentId, str, wALRead)) != null) {
            return initializeBuffer;
        }
        mappedWALFollower.setOffset(j);
        int i2 = mappedWALFollower.getBuf().getInt(0);
        updateMinOffset();
        if (z && rawOffsetToSegmentId >= rawOffsetToSegmentId(this.commitOffset)) {
            i2 = rawOffsetToSegmentOffset(this.commitOffset);
            logger.fine("Reading committed pos:" + i2 + " segmentid:" + rawOffsetToSegmentId + " commit offset:" + this.commitOffset + " req:" + (this.commitOffset - j));
        }
        readDataAndUpdateOffset(str, j, i, mappedWALFollower, wALRead, rawOffsetToSegmentId, i2, z);
        wALRead.setCommitOffset(getCommitOffset());
        return wALRead;
    }

    private void readDataAndUpdateOffset(String str, long j, int i, MappedWALFollower mappedWALFollower, WALRead wALRead, int i2, int i3, boolean z) {
        MappedByteBuffer buf = mappedWALFollower.getBuf();
        int rawOffsetToSegmentOffset = rawOffsetToSegmentOffset(j);
        buf.position(rawOffsetToSegmentOffset);
        if (rawOffsetToSegmentOffset < i3) {
            readDataTillPos(j, i, wALRead, i2, i3, buf, rawOffsetToSegmentOffset);
        }
        if (i3 != buf.position() || this.segmentCounter - 1 <= i2) {
            long position = (i2 * this.segmentSize) + buf.position();
            logger.fine("Follower(" + str + ") has more data to read from this(" + i2 + ") segment; moving to next offset(" + position + ") pos(" + i3 + ") bufpos(" + buf.position() + ")");
            wALRead.setNextOffset(position);
        } else {
            long j2 = ((i2 + 1) * this.segmentSize) + 4;
            logger.fine("Follower(" + str + ") doesn't have any more data to read from this segment, incrementing segment(" + i2 + "); segmentCounter:" + this.segmentCounter + " offset(" + j + ") moving to next offset(" + j2 + ") pos(" + i3 + ") bufpos(" + buf.position() + ")");
            wALRead.setNextOffset(j2);
        }
        if (!z || wALRead.getNextOffset() <= this.commitOffset) {
            return;
        }
        wALRead.setNextOffset(this.commitOffset);
    }

    private void readDataTillPos(long j, int i, WALRead wALRead, int i2, int i3, ByteBuffer byteBuffer, int i4) {
        ArrayList arrayList = new ArrayList();
        int i5 = 0;
        int i6 = i4;
        do {
            byteBuffer.position(i6);
            int i7 = byteBuffer.getInt();
            byte[] bArr = new byte[i7];
            byteBuffer.get(bArr);
            arrayList.add(bArr);
            i5 += i7 + 4;
            i6 = i4 + i5;
            if (i5 > i) {
                break;
            }
        } while (i6 < i3);
        logger.finest("//error:" + i4 + " l:" + i5 + " r:" + byteBuffer.remaining() + " t:" + i6 + " r:" + j + " s:" + i2 + " p:" + i3 + " s:" + arrayList.size() + " b:" + byteBuffer.position());
        wALRead.setData(arrayList);
    }

    private WALRead initializeBuffer(MappedWALFollower mappedWALFollower, int i, String str, WALRead wALRead) throws IOException {
        File file = new File(getSegmentFileName(this.walDirectory, i));
        if (!file.exists() && i < this.segmentCounter) {
            long j = i + (1 * this.segmentSize) + 4;
            logger.warning("Follower(" + str + ") requested file(" + i + "=" + file.getAbsolutePath() + ") doesn't exist, incrementing next(" + j + ")");
            wALRead.setNextOffset(j);
            return wALRead;
        }
        logger.fine("Follower(" + str + "), opening new wal segment to read:" + file.getAbsolutePath());
        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
        mappedWALFollower.setReader(randomAccessFile);
        mappedWALFollower.setBuf(randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, 0L, file.length()));
        mappedWALFollower.getBuf().getInt();
        return null;
    }

    private void checkAndCleanupSegment(String str, MappedWALFollower mappedWALFollower, int i, int i2) throws IOException {
        if (i != i2) {
            logger.info("Follower file(" + i2 + ") is doesn't match requested file id(" + i + "), reseting buffer & file id");
            mappedWALFollower.setBuf(null);
            if (mappedWALFollower.getReader() != null) {
                logger.finer("Follower(" + str + ") has existing open file, now closing");
                mappedWALFollower.getReader().close();
            }
            deleteWALSegments();
        }
    }

    private void updateMinOffset() {
        this.write.lock();
        long minimumOffset = getMinimumOffset();
        if (minimumOffset != Long.MAX_VALUE) {
            setCommitOffset(minimumOffset);
        }
        this.write.unlock();
    }

    private void checkAndRotateSegment(int i) throws IOException {
        if (this.currentWrite == null || this.currentWrite.remaining() < i) {
            this.write.lock();
            if (this.raf != null) {
                logger.fine("Flushing current write buffer");
                this.currentWrite.force();
                logger.fine("Closing write access to current segment file:" + getSegmentFileName(this.walDirectory, this.segmentCounter));
                this.raf.close();
            }
            boolean z = false;
            if (this.currentWrite == null) {
                this.segmentCounter = getLastSegmentCounter(this.walDirectory);
                if (this.segmentCounter > 1) {
                    z = true;
                    logger.info("Existing segment file detected; segment counter set to:" + this.segmentCounter);
                }
            }
            this.raf = new RandomAccessFile(new File(getSegmentFileName(this.walDirectory, this.segmentCounter)), "rwd");
            this.currentWrite = this.raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0L, this.segmentSize);
            logger.info("Creating new segment file:" + getSegmentFileName(this.walDirectory, this.segmentCounter));
            if (z) {
                int i2 = this.currentWrite.getInt(0);
                if (this.currentWrite.limit() > getCommitOffset()) {
                    this.currentWrite.position(rawOffsetToSegmentOffset(getCommitOffset()));
                } else {
                    this.currentWrite.position(i2);
                }
                logger.info("Found existing WAL:" + getSegmentFileName(this.walDirectory, this.segmentCounter) + ", forwarding offset to:" + getCommitOffset());
            } else {
                this.currentWrite.putInt(0);
            }
            logger.fine("Rotating segment file:" + getSegmentFileName(this.walDirectory, this.segmentCounter));
            this.segmentCounter++;
            this.write.unlock();
        }
    }

    public int rawOffsetToSegmentOffset(long j) {
        return (int) (j % this.segmentSize);
    }

    public int rawOffsetToSegmentId(long j) {
        return (int) (j / this.segmentSize);
    }

    public static String getSegmentFileName(String str, int i) {
        return str + ZKPaths.PATH_SEPARATOR + String.format("%012d", Integer.valueOf(i)) + ".wal";
    }

    public static int getLastSegmentCounter(String str) {
        File[] listFiles = new File(str).listFiles(new FilenameFilter() { // from class: com.srotya.minuteman.wal.MappedWAL.1
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str2) {
                return str2.endsWith(".wal");
            }
        });
        if (listFiles.length > 0) {
            return Integer.parseInt(listFiles[listFiles.length - 1].getName().replace(".wal", ""));
        }
        return 0;
    }

    @Override // com.srotya.minuteman.wal.WAL
    public int getSegmentCounter() {
        return this.segmentCounter;
    }

    @Override // com.srotya.minuteman.wal.WAL
    public long getCurrentOffset() {
        return this.currentWrite.position();
    }

    @Override // com.srotya.minuteman.wal.WAL
    public long getCommitOffset() {
        return this.commitOffset;
    }

    @Override // com.srotya.minuteman.wal.WAL
    public void setCommitOffset(long j) {
        this.write.lock();
        logger.finer("Updated commit offset:" + j);
        this.commitOffset = j;
        this.metaBuf.putLong(0, j);
        this.write.unlock();
    }

    @Override // com.srotya.minuteman.wal.WAL
    public long getFollowerOffset(String str) {
        MappedWALFollower mappedWALFollower = this.followerMap.get(str);
        if (mappedWALFollower != null) {
            return mappedWALFollower.getOffset();
        }
        return -1L;
    }

    private long getMinimumOffset() {
        logger.finest("Getting minimum offset among followers");
        long j = Long.MAX_VALUE;
        for (String str : (String[]) this.followerMap.keySet().toArray(new String[0])) {
            MappedWALFollower mappedWALFollower = this.followerMap.get(str);
            if (mappedWALFollower.isIsr()) {
                long offset = mappedWALFollower.getOffset();
                if (offset < j) {
                    j = offset;
                }
            } else {
                logger.finest("Ignoring(" + str + ") since it's not an ISR offset:" + mappedWALFollower.getOffset() + " current:" + this.currentWrite.position());
            }
        }
        logger.finest("Commit offset:" + j);
        return j;
    }

    @Override // com.srotya.minuteman.wal.WAL
    public void setWALDeletion(boolean z) {
        this.walDeletion.set(z);
    }

    private void deleteWALSegments() throws IOException {
        if (this.walDeletion.get()) {
            this.write.lock();
            int i = Integer.MAX_VALUE;
            String[] strArr = (String[]) this.followerMap.keySet().toArray(new String[0]);
            logger.fine("Follower Count:" + strArr.length);
            for (String str : strArr) {
                int rawOffsetToSegmentId = rawOffsetToSegmentId(this.followerMap.get(str).getOffset()) - 1;
                if (rawOffsetToSegmentId < i) {
                    i = rawOffsetToSegmentId;
                }
            }
            logger.fine("Minimum segment to that can be deleted:" + i);
            if (i == this.segmentCounter || i == Integer.MAX_VALUE) {
                logger.fine("Minimum segment is also the current segment, ignoring delete");
            } else {
                logger.fine("Segment compaction, will delete:" + i + " files");
                for (int i2 = 0; i2 < i; i2++) {
                    new File(getSegmentFileName(this.walDirectory, i2)).delete();
                    logger.fine("Segment compaction, deleting file:" + getSegmentFileName(this.walDirectory, i2));
                }
            }
            this.write.unlock();
        }
    }

    @Override // com.srotya.minuteman.wal.WAL
    public Collection<String> getFollowers() {
        return this.followerMap.keySet();
    }

    @Override // com.srotya.minuteman.wal.WAL
    public boolean isIsr(String str) {
        return this.followerMap.get(str).isr;
    }
}
