package com.srotya.minuteman.connectors;

import com.srotya.minuteman.cluster.Node;
import com.srotya.minuteman.cluster.Replica;
import com.srotya.minuteman.cluster.WALManager;
import io.atomix.AtomixReplica;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.group.DistributedGroup;
import io.atomix.group.GroupMember;
import io.atomix.group.election.Term;
import io.atomix.variables.DistributedValue;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/srotya/minuteman/connectors/AtomixConnector.class */
public class AtomixConnector extends ClusterConnector {
    public static final String FETCH_RETRY_INTERVAL = "cluster.atomix.fetch.retry.interval.ms";
    public static final String CLUSTER_ATOMIX_BOOTSTRAP_ADDRESSES = "cluster.atomix.bootstrap.addresses";
    public static final String CLUSTER_ATOMIX_BOOTSTRAP = "cluster.atomix.bootstrap";
    public static final String CLUSTER_ATOMIX_HEARTBEAT_INTERVAL = "cluster.atomix.heartbeat.interval";
    public static final String CLUSTER_ATOMIX_ELECTION_TIMEOUT = "cluster.atomix.election.timeout";
    public static final String CLUSTER_ATOMIX_STORAGE_DIRECTORY = "cluster.atomix.storage.directory";
    public static final String CLUSTER_ATOMIX_STORAGE_LEVEL = "cluster.atomix.storage.level";
    public static final String CLUSTER_ATOMIX_PORT = "cluster.atomix.port";
    public static final String CLUSTER_ATOMIX_HOST = "cluster.atomix.host";
    private static final String TABLE = "table";
    private static final String BROADCAST_GROUP = "controller";
    private static final Logger logger = Logger.getLogger(AtomixConnector.class.getName());
    private AtomixReplica atomix;
    private boolean isBootstrap;
    private String address;
    private int port;
    private volatile boolean isLeader;
    private DistributedGroup group;
    protected Node coordinator;
    private int fetchRetryInterval;

    /* loaded from: input_file:com/srotya/minuteman/connectors/AtomixConnector$NodeSerializer.class */
    public static class NodeSerializer implements TypeSerializer<Node> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.atomix.catalyst.serializer.TypeSerializer
        public Node read(Class<Node> cls, BufferInput bufferInput, Serializer serializer) {
            String readUTF8 = bufferInput.readUTF8();
            int readInt = bufferInput.readInt();
            return new Node(readUTF8 + ":" + readInt, readUTF8, readInt);
        }

        @Override // io.atomix.catalyst.serializer.TypeSerializer
        public void write(Node node, BufferOutput bufferOutput, Serializer serializer) {
            bufferOutput.writeUTF8(node.getAddress());
            bufferOutput.writeInt(node.getPort());
        }
    }

    /* loaded from: input_file:com/srotya/minuteman/connectors/AtomixConnector$ReplicaSerializer.class */
    public static class ReplicaSerializer implements TypeSerializer<Replica> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.atomix.catalyst.serializer.TypeSerializer
        public Replica read(Class<Replica> cls, BufferInput bufferInput, Serializer serializer) {
            Replica replica = new Replica();
            replica.setLeaderAddress(bufferInput.readString());
            replica.setLeaderPort(bufferInput.readInt());
            replica.setReplicaAddress(bufferInput.readString());
            replica.setReplicaPort(bufferInput.readInt());
            replica.setRouteKey(bufferInput.readString());
            replica.setIsr(bufferInput.readBoolean());
            return replica;
        }

        @Override // io.atomix.catalyst.serializer.TypeSerializer
        public void write(Replica replica, BufferOutput bufferOutput, Serializer serializer) {
            bufferOutput.writeUTF8(replica.getLeaderAddress());
            bufferOutput.writeInt(replica.getLeaderPort());
            bufferOutput.writeUTF8(replica.getReplicaAddress());
            bufferOutput.writeInt(replica.getReplicaPort());
            bufferOutput.writeUTF8(replica.getRouteKey());
            bufferOutput.writeBoolean(replica.isIsr());
        }
    }

    @Override // com.srotya.minuteman.connectors.ClusterConnector
    public void init(Map<String, String> map) throws Exception {
        AtomixReplica.Builder builder = AtomixReplica.builder(new Address(map.getOrDefault(CLUSTER_ATOMIX_HOST, WALManager.DEFAULT_CLUSTER_HOST), Integer.parseInt(map.getOrDefault(CLUSTER_ATOMIX_PORT, "8901"))));
        Storage.Builder builder2 = Storage.builder();
        builder2.withStorageLevel(StorageLevel.valueOf(map.getOrDefault(CLUSTER_ATOMIX_STORAGE_LEVEL, "MEMORY")));
        builder2.withDirectory(map.getOrDefault(CLUSTER_ATOMIX_STORAGE_DIRECTORY, "/tmp/sidewinder-atomix"));
        this.fetchRetryInterval = Integer.parseInt(map.getOrDefault(FETCH_RETRY_INTERVAL, "1000"));
        this.atomix = builder.withStorage(builder2.build()).withSessionTimeout(Duration.ofSeconds(10L)).withGlobalSuspendTimeout(Duration.ofMinutes(2L)).withType(AtomixReplica.Type.ACTIVE).withElectionTimeout(Duration.ofSeconds(Integer.parseInt(map.getOrDefault(CLUSTER_ATOMIX_ELECTION_TIMEOUT, "10")))).withHeartbeatInterval(Duration.ofSeconds(Integer.parseInt(map.getOrDefault(CLUSTER_ATOMIX_HEARTBEAT_INTERVAL, "5")))).build();
        this.atomix.serializer().register(Node.class, NodeSerializer.class);
        this.atomix.serializer().register(Replica.class, ReplicaSerializer.class);
        this.isBootstrap = Boolean.parseBoolean(map.getOrDefault(CLUSTER_ATOMIX_BOOTSTRAP, "true"));
        String[] split = map.getOrDefault(CLUSTER_ATOMIX_BOOTSTRAP_ADDRESSES, "localhost:8901").split(",");
        ArrayList arrayList = new ArrayList();
        for (String str : split) {
            arrayList.add(new Address(str));
        }
        if (this.isBootstrap) {
            logger.info("Joining cluster as bootstrap node");
            this.atomix.bootstrap(arrayList).join();
            this.atomix.getValue(TABLE);
        } else {
            logger.info("Joining cluster as a member node");
            this.atomix.join(arrayList).get();
        }
        logger.info("Atomix clustering initialized");
    }

    @Override // com.srotya.minuteman.connectors.ClusterConnector
    public void initializeRouterHooks(final WALManager wALManager) throws IOException {
        this.port = wALManager.getPort();
        this.address = wALManager.getAddress();
        this.group = getAtomix().getGroup(BROADCAST_GROUP, new DistributedGroup.Config().withMemberExpiration(Duration.ofSeconds(20L))).join();
        this.group.election().onElection(new Consumer<Term>() { // from class: com.srotya.minuteman.connectors.AtomixConnector.1
            @Override // java.util.function.Consumer
            public void accept(Term term) {
                AtomixConnector.logger.info("Completed leader election, new leader is => " + term.leader().id());
                if (AtomixConnector.this.isLocal(term.leader().id())) {
                    AtomixConnector.this.isLeader = true;
                    try {
                        wALManager.makeCoordinator();
                    } catch (Exception e) {
                        e.printStackTrace();
                        AtomixConnector.logger.severe("Error making corrdinator");
                    }
                } else {
                    AtomixConnector.this.isLeader = false;
                    AtomixConnector.logger.info("This node is not the leader");
                }
                Node node = wALManager.getNodeMap().get(term.leader().id());
                if (node == null) {
                    AtomixConnector.logger.info("Leader node is empty:" + term.leader().id());
                    node = AtomixConnector.this.buildNode(term.leader().id());
                }
                wALManager.setCoordinator(node);
                AtomixConnector.this.coordinator = node;
                AtomixConnector.logger.info("Node:" + AtomixConnector.this.address + ":" + AtomixConnector.this.port + "\t leader status:" + AtomixConnector.this.isLeader);
            }
        });
        this.group.onJoin(new Consumer<GroupMember>() { // from class: com.srotya.minuteman.connectors.AtomixConnector.2
            @Override // java.util.function.Consumer
            public void accept(GroupMember groupMember) {
                AtomixConnector.logger.info("Node found:" + groupMember.id());
                try {
                    wALManager.addNode(AtomixConnector.this.buildNode(groupMember.id()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
        this.group.onLeave(new Consumer<GroupMember>() { // from class: com.srotya.minuteman.connectors.AtomixConnector.3
            @Override // java.util.function.Consumer
            public void accept(GroupMember groupMember) {
                AtomixConnector.logger.info("Node left:" + groupMember.id());
                try {
                    wALManager.removeNode(groupMember.id());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        this.group.join(this.address + ":" + this.port).join();
        logger.info("Created cluster using Atomix connector");
        for (GroupMember groupMember : this.group.members()) {
            if (!isLocal(groupMember.id())) {
                wALManager.addNode(buildNode(groupMember.id()));
            }
        }
        wALManager.resume();
        try {
            ((DistributedValue) getAtomix().getValue(TABLE).get()).onChange(changeEvent -> {
                wALManager.setRouteTable(changeEvent.newValue());
            });
        } catch (InterruptedException | ExecutionException e) {
            logger.log(Level.SEVERE, "Error updating route table on node " + this.address + ":" + this.port, e);
        }
    }

    @Override // com.srotya.minuteman.connectors.ClusterConnector
    public boolean isCoordinator() {
        return this.isLeader;
    }

    public AtomixReplica getAtomix() {
        return this.atomix;
    }

    @Override // com.srotya.minuteman.connectors.ClusterConnector
    public boolean isBootstrap() {
        return this.isBootstrap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isLocal(String str) {
        String[] split = str.split(":");
        return split[0].equalsIgnoreCase(this.address) && Integer.parseInt(split[1]) == this.port;
    }

    @Override // com.srotya.minuteman.connectors.ClusterConnector
    public int getClusterSize() throws Exception {
        return getAtomix().getGroup(BROADCAST_GROUP).join().members().size();
    }

    @Override // com.srotya.minuteman.connectors.ClusterConnector
    public Object fetchRoutingTable(int i) {
        try {
            logger.info("Fetching route table info from metastore");
            DistributedValue distributedValue = (DistributedValue) getAtomix().getValue(TABLE).get(2L, TimeUnit.SECONDS);
            logger.info("Fetched route table info from metastore:" + distributedValue.get());
            return distributedValue.get().get();
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to fetch routing table on node " + this.address + ":" + this.port + " reason:" + e.getMessage());
            if (i <= 0) {
                return null;
            }
            logger.info("Failed to fetch routing table reason:" + e.getMessage() + " will retry in 1s");
            try {
                Thread.sleep(this.fetchRetryInterval);
                return fetchRoutingTable(i - 1);
            } catch (InterruptedException e2) {
                return null;
            }
        }
    }

    @Override // com.srotya.minuteman.connectors.ClusterConnector
    public void updateTable(Object obj) throws Exception {
        ((DistributedValue) getAtomix().getValue(TABLE).get()).set(obj);
        logger.fine("Updated route table in atomix");
    }

    @Override // com.srotya.minuteman.connectors.ClusterConnector
    public Node getCoordinator() {
        return this.coordinator;
    }

    @Override // com.srotya.minuteman.connectors.ClusterConnector
    public void stop() throws Exception {
        this.atomix.leave().get();
        this.atomix.shutdown();
    }
}
