package com.srotya.minuteman.rpc;

import com.google.protobuf.ByteString;
import com.srotya.minuteman.cluster.Replica;
import com.srotya.minuteman.cluster.WALManager;
import com.srotya.minuteman.rpc.BatchDataResponse;
import com.srotya.minuteman.rpc.GenericResponse;
import com.srotya.minuteman.rpc.ReplicationServiceGrpc;
import com.srotya.minuteman.rpc.RouteResponse;
import com.srotya.minuteman.wal.WAL;
import com.srotya.minuteman.wal.WALRead;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.zookeeper.server.ZKDatabase;

/* loaded from: input_file:com/srotya/minuteman/rpc/ReplicationServiceImpl.class */
public class ReplicationServiceImpl extends ReplicationServiceGrpc.ReplicationServiceImplBase {
    private static final Logger logger = Logger.getLogger(ReplicationServiceImpl.class.getName());
    private WALManager mgr;

    public ReplicationServiceImpl(WALManager wALManager) {
        this.mgr = wALManager;
    }

    @Override // com.srotya.minuteman.rpc.ReplicationServiceGrpc.ReplicationServiceImplBase
    public void requestBatchReplication(BatchDataRequest batchDataRequest, StreamObserver<BatchDataResponse> streamObserver) {
        BatchDataResponse.Builder newBuilder = BatchDataResponse.newBuilder();
        try {
            WAL wal = this.mgr.getWAL(batchDataRequest.getRouteKey());
            if (wal != null) {
                WALRead read = wal.read(batchDataRequest.getNodeId(), batchDataRequest.getOffset(), batchDataRequest.getMaxBytes(), false);
                newBuilder.setNextOffset(read.getNextOffset()).setCommitOffset(read.getCommitOffset());
                if (read.getData() != null) {
                    Iterator<byte[]> it = read.getData().iterator();
                    while (it.hasNext()) {
                        newBuilder.addData(ByteString.copyFrom(it.next()));
                    }
                }
                streamObserver.onNext(newBuilder.build());
            } else {
                newBuilder.setNextOffset(-1L).setResponseCode(404);
                streamObserver.onNext(newBuilder.build());
            }
        } catch (IOException e) {
            logger.log(Level.SEVERE, "Failed request to add new route key:" + batchDataRequest.getRouteKey(), (Throwable) e);
            streamObserver.onNext(newBuilder.setResponseCode(ZKDatabase.commitLogCount).build());
        }
        streamObserver.onCompleted();
    }

    @Override // com.srotya.minuteman.rpc.ReplicationServiceGrpc.ReplicationServiceImplBase
    public void addRoute(RouteRequest routeRequest, StreamObserver<RouteResponse> streamObserver) {
        RouteResponse.Builder newBuilder = RouteResponse.newBuilder();
        try {
            logger.info("Request to add new route key and compute routes for it:" + routeRequest.getRouteKey());
            List<Replica> addRoutableKey = this.mgr.addRoutableKey(routeRequest.getRouteKey(), routeRequest.getReplicationFactor());
            streamObserver.onNext(newBuilder.addAllReplicaids((List) addRoutableKey.stream().map(replica -> {
                return replica.getReplicaNodeKey();
            }).collect(Collectors.toList())).setLeaderid(addRoutableKey.get(0).getLeaderNodeKey()).setResponseCode(200).setResponseString("Successful").build());
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed request to add new route key:" + routeRequest.getRouteKey() + " reason:" + e.getMessage());
            logger.log(Level.FINE, "Route request failure reason for key:" + routeRequest.getRouteKey(), (Throwable) e);
            streamObserver.onNext(newBuilder.setResponseCode(ZKDatabase.commitLogCount).setResponseString(e.getMessage()).build());
        }
        streamObserver.onCompleted();
    }

    @Override // com.srotya.minuteman.rpc.ReplicationServiceGrpc.ReplicationServiceImplBase
    public void addReplica(ReplicaRequest replicaRequest, StreamObserver<GenericResponse> streamObserver) {
        try {
            Replica replica = new Replica();
            replica.setLeaderAddress(replicaRequest.getLeaderAddress());
            replica.setLeaderPort(replicaRequest.getLeaderPort());
            replica.setReplicaAddress(replicaRequest.getReplicaAddress());
            replica.setReplicaPort(replicaRequest.getReplicaPort());
            replica.setRouteKey(replicaRequest.getRouteKey());
            logger.info("Request to add replica:" + replicaRequest.getRouteKey());
            this.mgr.replicaUpdated(replica);
            streamObserver.onNext(GenericResponse.newBuilder().setResponseCode(200).setResponseString("Success!").build());
        } catch (Exception e) {
            e.printStackTrace();
            streamObserver.onNext(GenericResponse.newBuilder().setResponseCode(ZKDatabase.commitLogCount).setResponseString("Failed to add replica:" + e.getMessage()).build());
        }
        logger.info("Request to add replica COMPLETED:" + replicaRequest.getRouteKey());
        streamObserver.onCompleted();
    }

    @Override // com.srotya.minuteman.rpc.ReplicationServiceGrpc.ReplicationServiceImplBase
    public void writeData(DataRequest dataRequest, StreamObserver<GenericResponse> streamObserver) {
        try {
            WAL wal = this.mgr.getWAL(dataRequest.getRouteKey());
            if (wal != null) {
                wal.write(dataRequest.getData().toByteArray(), false);
                streamObserver.onNext(GenericResponse.newBuilder().setResponseCode(200).setResponseString("Success!").build());
            } else {
                streamObserver.onNext(GenericResponse.newBuilder().setResponseCode(400).setResponseString("Wal not found on node:" + dataRequest.getRouteKey() + " " + this.mgr.getThisNodeKey()).build());
            }
        } catch (Exception e) {
            e.printStackTrace();
            streamObserver.onNext(GenericResponse.newBuilder().setResponseCode(ZKDatabase.commitLogCount).setResponseString("Failed to write data:" + e.getMessage()).build());
        }
        streamObserver.onCompleted();
    }

    @Override // com.srotya.minuteman.rpc.ReplicationServiceGrpc.ReplicationServiceImplBase
    public void updateIsr(IsrUpdateRequest isrUpdateRequest, StreamObserver<GenericResponse> streamObserver) {
        String routeKey = isrUpdateRequest.getRouteKey();
        Map<String, Boolean> isrMapMap = isrUpdateRequest.getIsrMapMap();
        GenericResponse.Builder newBuilder = GenericResponse.newBuilder();
        try {
            this.mgr.updateReplicaIsrStatus(routeKey, isrMapMap);
            newBuilder.setResponseCode(200);
        } catch (Exception e) {
            e.printStackTrace();
            logger.log(Level.SEVERE, "Failed to update ISR status for coordinator", (Throwable) e);
            newBuilder.setResponseCode(ZKDatabase.commitLogCount);
            newBuilder.setResponseString(e.getMessage());
        }
        streamObserver.onNext(newBuilder.build());
        streamObserver.onCompleted();
    }
}
