package com.srotya.minuteman.cluster;

import com.srotya.minuteman.connectors.ClusterConnector;
import com.srotya.minuteman.rpc.GenericResponse;
import com.srotya.minuteman.rpc.IsrUpdateRequest;
import com.srotya.minuteman.rpc.ReplicationServiceGrpc;
import com.srotya.minuteman.rpc.ReplicationServiceImpl;
import com.srotya.minuteman.wal.LocalWALClient;
import com.srotya.minuteman.wal.RemoteWALClient;
import com.srotya.minuteman.wal.WAL;
import io.grpc.BindableService;
import io.grpc.DecompressorRegistry;
import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.log4j.spi.Configurator;

/* loaded from: input_file:com/srotya/minuteman/cluster/WALManagerImpl.class */
public class WALManagerImpl extends WALManager {
    private static final Logger logger = Logger.getLogger(WALManagerImpl.class.getName());
    private ClusterConnector connector;
    private Server server;
    private Class<LocalWALClient> walClientClass;
    private long allocator;
    private int isrUpdateFrequency;
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.WriteLock write = this.lock.writeLock();
    private List<Node> nodes = new LinkedList();
    private Map<String, Replica> localReplicaTable = new HashMap();
    private Map<String, List<Replica>> routeTable = new HashMap();

    @Override // com.srotya.minuteman.cluster.WALManager
    public void init(Map<String, String> map, ClusterConnector clusterConnector, ScheduledExecutorService scheduledExecutorService, Object obj) throws Exception {
        super.init(map, clusterConnector, scheduledExecutorService, obj);
        this.connector = clusterConnector;
        this.walClientClass = Class.forName(map.getOrDefault(WALManager.WAL_CLIENT_CLASS, LocalWALClient.class.getName()));
        clusterConnector.initializeRouterHooks(this);
        this.server = NettyServerBuilder.forPort(getPort()).decompressorRegistry(DecompressorRegistry.getDefaultInstance()).addService((BindableService) new ReplicationServiceImpl(this)).build().start();
        logger.info("Listening for GRPC requests on port:" + getPort());
        this.isrUpdateFrequency = Integer.parseInt(map.getOrDefault(WAL.WAL_ISRCHECK_FREQUENCY, "10"));
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            if (clusterConnector.getCoordinator() != null) {
                ReplicationServiceGrpc.ReplicationServiceBlockingStub newBlockingStub = ReplicationServiceGrpc.newBlockingStub(clusterConnector.getCoordinator().getChannel());
                for (Map.Entry<String, Replica> entry : this.localReplicaTable.entrySet()) {
                    if (isLeader(entry.getValue())) {
                        HashMap hashMap = new HashMap();
                        WAL wal = entry.getValue().getWal();
                        logger.fine("Updating ISRs for(" + entry.getKey() + ") followers:" + wal.getFollowers());
                        for (String str : wal.getFollowers()) {
                            hashMap.put(str, Boolean.valueOf(wal.isIsr(str)));
                        }
                        GenericResponse updateIsr = newBlockingStub.updateIsr(IsrUpdateRequest.newBuilder().setRouteKey(entry.getKey()).putAllIsrMap(hashMap).build());
                        if (updateIsr.getResponseCode() == 200) {
                            logger.fine("Updated ISRs with coordinator for routeKey:" + entry.getKey());
                        } else {
                            logger.severe("ISR update with coordinator failed for routeKey:" + entry.getKey() + " reason:" + updateIsr.getResponseString() + " code:" + updateIsr.getResponseCode());
                        }
                    }
                }
            }
        }, 0L, this.isrUpdateFrequency, TimeUnit.SECONDS);
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void updateReplicaIsrStatus(String str, Map<String, Boolean> map) throws Exception {
        if (!this.connector.isCoordinator()) {
            throw new UnsupportedOperationException("This(" + getThisNodeKey() + ") is not a coordinator(" + getCoordinator().getNodeKey() + ") node, can't perform ISR updates");
        }
        this.write.lock();
        try {
            try {
                List<Replica> list = this.routeTable.get(str);
                if (list != null) {
                    for (Replica replica : list) {
                        Boolean bool = map.get(replica.getReplicaNodeKey());
                        if (bool != null) {
                            replica.setIsr(bool.booleanValue());
                        } else {
                            logger.severe("Missing ISR status for replica(" + replica.getReplicaNodeKey() + ") for route key(" + str + ")");
                        }
                    }
                    this.connector.updateTable(this.routeTable);
                }
            } catch (Exception e) {
                throw e;
            }
        } finally {
            this.write.unlock();
        }
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public List<Replica> addRoutableKey(String str, int i) throws Exception {
        if (!this.connector.isCoordinator()) {
            throw new UnsupportedOperationException("This(" + getThisNodeKey() + ") is not a coordinator(" + getCoordinator().getNodeKey() + ") node, can't perform route modifications");
        }
        if (i > this.nodes.size()) {
            throw new IllegalArgumentException("Fewer nodes(" + this.nodes.size() + ") in the cluster than requested replication factor(" + i + ")");
        }
        this.write.lock();
        try {
            try {
                logger.info("Replication factor:" + i + " requested for new key:" + str);
                List<Replica> list = this.routeTable.get(str);
                if (list == null) {
                    list = new ArrayList();
                    this.routeTable.put(str, list);
                    Replica replica = new Replica();
                    Node node = getNode();
                    nodeToReplica(str, replica, node, node);
                    list.add(0, replica);
                    for (int i2 = 1; i2 < i; i2++) {
                        Node node2 = getNode();
                        Replica replica2 = new Replica();
                        nodeToReplica(str, replica2, node, node2);
                        list.add(i2, replica2);
                    }
                    logger.info("Route key:" + str + " has replicas:" + list.stream().map(replica3 -> {
                        return replica3.getReplicaNodeKey();
                    }).collect(Collectors.toList()) + " leader:" + list.get(0).getReplicaNodeKey());
                    Iterator<Replica> it = list.iterator();
                    while (it.hasNext()) {
                        updateReplica(it.next());
                    }
                    this.connector.updateTable(this.routeTable);
                    this.allocator++;
                }
                return list;
            } catch (Exception e) {
                logger.log(Level.SEVERE, "Failed to update route table in distributed metastore", (Throwable) e);
                throw e;
            }
        } finally {
            this.write.unlock();
        }
    }

    private void updateReplica(Replica replica) throws IOException, Exception {
        if (isLocal(replica)) {
            logger.info(replica.getReplicaNodeKey() + " is local on: " + getThisNodeKey());
            replicaUpdated(replica);
        } else {
            logger.info(replica.getReplicaNodeKey() + " is NOT local on: " + getThisNodeKey());
            this.connector.updateReplicaRoute(this, replica, false);
        }
    }

    private Node getNode() {
        List<Node> list = this.nodes;
        long j = this.allocator;
        this.allocator = j + 1;
        Node node = list.get((int) (j % this.nodes.size()));
        logger.info("NODE:" + node.getNodeKey());
        return node;
    }

    private void nodeToReplica(String str, Replica replica, Node node, Node node2) {
        replica.setLeaderAddress(node.getAddress());
        replica.setLeaderPort(node.getPort());
        replica.setRouteKey(str);
        replica.setReplicaAddress(node2.getAddress());
        replica.setReplicaPort(node2.getPort());
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void addNode(Node node) throws IOException {
        this.write.lock();
        if (getNodeMap().get(node.getNodeKey()) == null) {
            logger.info("Adding node(" + node.getNodeKey() + ") to WALManager");
            this.nodes.add(node);
            getNodeMap().put(node.getNodeKey(), node);
            logger.info("Node(" + node.getNodeKey() + ") added to WALManager");
        } else {
            logger.info("Node(" + node.getNodeKey() + ") is already present");
        }
        this.write.unlock();
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void removeNode(String str) throws Exception {
        this.write.lock();
        try {
            if (this.connector.isCoordinator()) {
                for (Map.Entry<String, List<Replica>> entry : this.routeTable.entrySet()) {
                    List<Replica> value = entry.getValue();
                    if (str.equals(value.get(0).getLeaderNodeKey())) {
                        value.remove(0);
                        Replica firstIsr = getFirstIsr(value);
                        if (value.isEmpty() || firstIsr == null) {
                            logger.info("No suitable replicas left for this route key:" + entry.getKey());
                        } else {
                            firstIsr.setLeaderAddress(firstIsr.getReplicaAddress());
                            firstIsr.setLeaderPort(firstIsr.getReplicaPort());
                            updateReplica(firstIsr);
                            for (int i = 0; i < value.size(); i++) {
                                Replica replica = value.get(i);
                                replica.setLeaderAddress(firstIsr.getLeaderAddress());
                                replica.setLeaderPort(firstIsr.getLeaderPort());
                                updateReplica(replica);
                            }
                        }
                    }
                }
                this.connector.updateTable(this.routeTable);
            }
            logger.info("Removing node(" + str + ") from WALManager");
            Node remove = getNodeMap().remove(str);
            this.nodes.remove(remove);
            if (remove != null) {
                Iterator<Map.Entry<String, Replica>> it = this.localReplicaTable.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<String, Replica> next = it.next();
                    if (str.equals(next.getValue().getLeaderNodeKey())) {
                        next.getValue().getClient().stop();
                        it.remove();
                    }
                }
            }
            logger.info("Node(" + str + ") removed from WALManager");
            this.write.unlock();
        } catch (Throwable th) {
            this.write.unlock();
            throw th;
        }
    }

    private Replica getFirstIsr(List<Replica> list) {
        for (Replica replica : list) {
            if (replica.isIsr()) {
                return replica;
            }
        }
        return null;
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void replicaUpdated(Replica replica) throws IOException {
        this.write.lock();
        try {
            try {
                logger.info("Replica updated:" + replica.getLeaderNodeKey() + "\t\t node:" + getThisNodeKey());
                Replica replica2 = this.localReplicaTable.get(replica.getRouteKey());
                if (replica2 == null) {
                    replica2 = replica;
                    this.localReplicaTable.put(replica.getRouteKey(), replica);
                    replica.setWal(super.initializeWAL(replica.getRouteKey()));
                } else if (replica2.getClient() != null) {
                    replica2.getClient().stop();
                }
                if (!isLeader(replica2)) {
                    Node node = getNodeMap().get(replica.getLeaderNodeKey());
                    logger.info("Node leader update:" + getThisNodeKey() + "\tReplica leader:" + (node != null ? node.getNodeKey() : Configurator.NULL) + "\t" + replica.getLeaderNodeKey() + "\t" + replica2.getWal() + "\t" + replica2.getRouteKey());
                    replica2.setClient(new RemoteWALClient().configure(getConf(), getThisNodeKey(), node.getChannel(), replica2.getWal(), replica2.getRouteKey()));
                    Thread thread = new Thread(replica2.getClient());
                    thread.setDaemon(true);
                    thread.start();
                    logger.info("Starting replication thread for:" + replica.getRouteKey() + " on replica => " + replica2.getReplicaNodeKey());
                }
                if (replica2.getLocal() == null) {
                    replica2.setLocal(this.walClientClass.newInstance().configure(getConf(), getThisNodeKey(), replica2.getWal(), this.storageObject));
                    Thread thread2 = new Thread(replica2.getLocal());
                    thread2.setDaemon(true);
                    thread2.start();
                    logger.info("Starting local follower thread for:" + replica.getRouteKey() + " on replica => " + replica2.getReplicaNodeKey());
                }
            } catch (IllegalAccessException | InstantiationException e) {
                throw new IOException(e);
            }
        } finally {
            this.write.unlock();
        }
    }

    private boolean isLeader(Replica replica) {
        return replica.getLeaderNodeKey().equals(replica.getReplicaNodeKey());
    }

    private boolean isLocal(Replica replica) {
        return replica.getReplicaNodeKey().equals(getThisNodeKey());
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void replicaRemoved(Replica replica) throws Exception {
        this.write.lock();
        logger.info("Removing replica " + replica.getRouteKey() + " cleaning up entries on " + getThisNodeKey());
        Replica replica2 = this.localReplicaTable.get(replica.getRouteKey());
        if (replica2 != null) {
            replica2.getClient().stop();
            logger.info("Replica removed " + replica.getRouteKey() + " on " + getThisNodeKey());
        } else {
            logger.info("Replica already removed, nothing to do");
        }
        this.write.unlock();
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void makeCoordinator() throws Exception {
        this.write.lock();
        this.routeTable = (Map) this.connector.fetchRoutingTable(10);
        logger.info("Fetched latest route table from metastore:" + this.routeTable);
        if (this.routeTable == null) {
            ClusterConnector clusterConnector = this.connector;
            HashMap hashMap = new HashMap();
            this.routeTable = hashMap;
            clusterConnector.updateTable(hashMap);
            logger.info("No route table in metastore, created an empty one");
        } else if (getCoordinator() == null || getThisNodeKey().equals(getCoordinator().getNodeKey())) {
            logger.warning("Ignoring route table correction because of self last coordinator");
        } else {
            removeNode(getCoordinator().getNodeKey());
        }
        this.write.unlock();
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public Object getRoutingTable() {
        return this.localReplicaTable;
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void stop() throws InterruptedException {
        for (Map.Entry<String, Replica> entry : this.localReplicaTable.entrySet()) {
            try {
                logger.info("Attempting to stop replica:" + entry.getKey() + " on node:" + getThisNodeKey());
                entry.getValue().getLocal().stop();
                if (entry.getValue().getClient() != null) {
                    entry.getValue().getClient().stop();
                }
                entry.getValue().getWal().close();
                logger.info("Stopped replica:" + entry.getKey() + " on node:" + getThisNodeKey());
            } catch (Exception e) {
                logger.log(Level.SEVERE, "Failed to stop wal:" + entry.getKey() + "\t on replica:" + entry.getValue().getReplicaNodeKey() + "\tleader:" + entry.getValue().getLeaderNodeKey(), (Throwable) e);
            }
        }
        this.server.shutdownNow().awaitTermination(5L, TimeUnit.SECONDS);
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void resume() throws IOException {
        Map map = (Map) this.connector.fetchRoutingTable(10);
        if (map != null) {
            Iterator it = map.entrySet().iterator();
            while (it.hasNext()) {
                for (Replica replica : (List) ((Map.Entry) it.next()).getValue()) {
                    if (replica.getReplicaNodeKey().equals(getThisNodeKey())) {
                        logger.info("Found replica assignment for local node, resuming:" + replica.getReplicaNodeKey() + "\t" + replica.getRouteKey());
                        replicaUpdated(replica);
                    }
                }
            }
        }
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public WAL getWAL(String str) throws IOException {
        Replica replica = this.localReplicaTable.get(str);
        if (replica == null) {
            return null;
        }
        return replica.getWal();
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void setCoordinator(Node node) {
        this.write.lock();
        super.setCoordinator(node);
        this.write.unlock();
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public String getReplicaLeader(String str) {
        List<Replica> list = this.routeTable.get(str);
        if (list != null) {
            return list.get(0).getLeaderNodeKey();
        }
        return null;
    }

    @Override // com.srotya.minuteman.cluster.WALManager
    public void setRouteTable(Object obj) {
        this.write.lock();
        this.routeTable = (Map) obj;
        this.write.unlock();
    }
}
