package com.srotya.minuteman.wal;

import com.google.protobuf.ByteString;
import com.srotya.minuteman.cluster.WALManager;
import com.srotya.minuteman.rpc.BatchDataRequest;
import com.srotya.minuteman.rpc.BatchDataResponse;
import com.srotya.minuteman.rpc.ReplicationServiceGrpc;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/srotya/minuteman/wal/RemoteWALClient.class */
public class RemoteWALClient extends WALClient {
    private static final Logger logger = Logger.getLogger(RemoteWALClient.class.getName());
    private AtomicInteger counter;
    private ReplicationServiceGrpc.ReplicationServiceBlockingStub stub;
    private AtomicLong metricRequestTime = new AtomicLong();
    private AtomicLong metricWriteTime = new AtomicLong();
    private AtomicLong loopCounter = new AtomicLong();
    private String routeKey;

    public RemoteWALClient configure(Map<String, String> map, String str, ManagedChannel managedChannel, WAL wal, String str2) throws IOException {
        super.configure(map, str, wal);
        this.routeKey = str2;
        this.counter = new AtomicInteger(0);
        this.stub = ReplicationServiceGrpc.newBlockingStub(managedChannel).withCompression(map.getOrDefault(WALManager.CLUSTER_GRPC_COMPRESSION, "gzip"));
        return this;
    }

    @Override // com.srotya.minuteman.wal.WALClient
    public void iterate() {
        try {
            this.loopCounter.incrementAndGet();
            logger.fine("CLIENT: Requesting data:" + this.offset);
            long currentTimeMillis = System.currentTimeMillis();
            BatchDataResponse requestBatchReplication = this.stub.requestBatchReplication(BatchDataRequest.newBuilder().setNodeId(this.nodeId).setOffset(this.offset).setMaxBytes(this.maxFetchBytes).setRouteKey(this.routeKey).build());
            this.metricRequestTime.getAndAdd(System.currentTimeMillis() - currentTimeMillis);
            if (requestBatchReplication.getDataList() == null || requestBatchReplication.getDataList().isEmpty()) {
                logger.fine("CLIENT: No data to replicate, delaying poll, offset:" + this.offset);
                Thread.sleep(this.retryWait);
            } else {
                long currentTimeMillis2 = System.currentTimeMillis();
                try {
                    Iterator<ByteString> it = requestBatchReplication.getDataList().iterator();
                    while (it.hasNext()) {
                        this.wal.write(it.next().toByteArray(), false);
                    }
                } catch (Exception e) {
                    logger.log(Level.SEVERE, "Failure to write to local WAL", (Throwable) e);
                }
                this.metricWriteTime.getAndAdd(System.currentTimeMillis() - currentTimeMillis2);
                logger.fine("CLIENT: Client received:" + requestBatchReplication.getDataList().size() + " messages \t fileId:" + requestBatchReplication.getNextOffset());
                this.counter.addAndGet(requestBatchReplication.getDataList().size());
                this.wal.setCommitOffset(requestBatchReplication.getCommitOffset());
            }
            this.offset = requestBatchReplication.getNextOffset();
        } catch (Exception e2) {
            logger.log(Level.SEVERE, "Failure to replicate WAL", (Throwable) e2);
            try {
                Thread.sleep(this.errorRetryWait);
            } catch (InterruptedException e3) {
                logger.severe("CLIENT: Remote client interrupt received, breaking loop");
                stop();
            }
        }
    }

    public long getPos() {
        return this.wal.getCurrentOffset();
    }

    public WAL getWal() {
        return this.wal;
    }
}
