package com.srotya.sidewinder.core.rpc;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.srotya.sidewinder.core.rpc.WriterServiceGrpc;
import com.srotya.sidewinder.core.storage.StorageEngine;
import com.srotya.sidewinder.core.storage.TimeSeries;
import com.srotya.sidewinder.core.storage.compression.Writer;
import com.srotya.sidewinder.core.utils.BackgrounThreadFactory;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/srotya/sidewinder/core/rpc/WriterServiceImpl.class */
public class WriterServiceImpl extends WriterServiceGrpc.WriterServiceImplBase {
    private RingBuffer<DPWrapper> buffer;
    private Disruptor<DPWrapper> disruptor;
    private StorageEngine engine;
    private Map<String, String> conf;
    private int handlerCount;
    private ExecutorService es;
    private boolean disruptorEnable;
    private DataPointTranslator translator;

    /* loaded from: input_file:com/srotya/sidewinder/core/rpc/WriterServiceImpl$DPWrapper.class */
    public static class DPWrapper {
        private long messageId;
        private Point dp;
        private StreamObserver<Ack> responseObserver;
        private int hashValue;

        public long getMessageId() {
            return this.messageId;
        }

        public void setMessageId(long j) {
            this.messageId = j;
        }

        public Point getDp() {
            return this.dp;
        }

        public void setDp(Point point) {
            this.dp = point;
        }

        public StreamObserver<Ack> getResponseObserver() {
            return this.responseObserver;
        }

        public void setResponseObserver(StreamObserver<Ack> streamObserver) {
            this.responseObserver = streamObserver;
        }

        public int getHashValue() {
            return this.hashValue;
        }

        public void setHashValue(int i) {
            this.hashValue = i;
        }
    }

    /* loaded from: input_file:com/srotya/sidewinder/core/rpc/WriterServiceImpl$DPWrapperFactory.class */
    public static class DPWrapperFactory implements EventFactory<DPWrapper> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.lmax.disruptor.EventFactory
        public DPWrapper newInstance() {
            return new DPWrapper();
        }
    }

    /* loaded from: input_file:com/srotya/sidewinder/core/rpc/WriterServiceImpl$DataPointTranslator.class */
    public static class DataPointTranslator implements EventTranslatorThreeArg<DPWrapper, Point, Long, StreamObserver<Ack>> {
        @Override // com.lmax.disruptor.EventTranslatorThreeArg
        public void translateTo(DPWrapper dPWrapper, long j, Point point, Long l, StreamObserver<Ack> streamObserver) {
            dPWrapper.setDp(point);
            dPWrapper.setMessageId(l.longValue());
            dPWrapper.setResponseObserver(streamObserver);
        }
    }

    /* loaded from: input_file:com/srotya/sidewinder/core/rpc/WriterServiceImpl$HashHandler.class */
    public static class HashHandler implements EventHandler<DPWrapper> {
        @Override // com.lmax.disruptor.EventHandler
        public void onEvent(DPWrapper dPWrapper, long j, boolean z) throws Exception {
            StringBuilder sb = new StringBuilder();
            Point dp = dPWrapper.getDp();
            sb.append(dp.getDbName() + "\n");
            sb.append(dp.getMeasurementName() + "\n");
            sb.append(dp.getValueFieldName() + "\n");
            sb.append(dp.getTagsList().toString() + "\n");
            dPWrapper.setHashValue(sb.toString().hashCode());
        }
    }

    /* loaded from: input_file:com/srotya/sidewinder/core/rpc/WriterServiceImpl$WriteHandler.class */
    public static class WriteHandler implements EventHandler<DPWrapper> {
        private StorageEngine engine;
        private int handlerCount;
        private int handlerIndex;

        public WriteHandler(StorageEngine storageEngine, int i, int i2) {
            this.engine = storageEngine;
            this.handlerCount = i;
            this.handlerIndex = i2;
        }

        @Override // com.lmax.disruptor.EventHandler
        public void onEvent(DPWrapper dPWrapper, long j, boolean z) throws Exception {
            Ack build;
            if (dPWrapper.getHashValue() % this.handlerCount == this.handlerIndex) {
                try {
                    this.engine.writeDataPoint(dPWrapper.getDp());
                    build = Ack.newBuilder().setMessageId(dPWrapper.getMessageId()).setResponseCode(200).build();
                } catch (IOException e) {
                    build = Ack.newBuilder().setMessageId(dPWrapper.getMessageId()).setResponseCode(400).build();
                } catch (Exception e2) {
                    build = Ack.newBuilder().setMessageId(dPWrapper.getMessageId()).setResponseCode(500).build();
                }
                if (dPWrapper.getResponseObserver() != null) {
                    dPWrapper.getResponseObserver().onNext(build);
                    dPWrapper.getResponseObserver().onCompleted();
                }
            }
        }
    }

    /* loaded from: input_file:com/srotya/sidewinder/core/rpc/WriterServiceImpl$WriteStreamObserver.class */
    public static class WriteStreamObserver implements StreamObserver<SingleData> {
        private RingBuffer<DPWrapper> ring;
        private DataPointTranslator translator = new DataPointTranslator();
        private StreamObserver<Ack> responseObserver;

        public WriteStreamObserver(RingBuffer<DPWrapper> ringBuffer, StreamObserver<Ack> streamObserver) {
            this.ring = ringBuffer;
            this.responseObserver = streamObserver;
        }

        @Override // io.grpc.stub.StreamObserver
        public void onNext(SingleData singleData) {
            this.ring.publishEvent(this.translator, singleData.getPoint(), Long.valueOf(singleData.getMessageId()), this.responseObserver);
        }

        @Override // io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
        }

        @Override // io.grpc.stub.StreamObserver
        public void onCompleted() {
        }
    }

    public WriterServiceImpl(StorageEngine storageEngine, Map<String, String> map) {
        this.engine = storageEngine;
        this.conf = map;
        this.disruptorEnable = Boolean.parseBoolean(map.getOrDefault("grpc.disruptor.enabled", "false"));
        if (this.disruptorEnable) {
            int parseInt = Integer.parseInt(map.getOrDefault("grpc.disruptor.buffer.size", "16384"));
            if (parseInt % 2 != 0) {
                throw new IllegalArgumentException("Disruptor buffers must always be power of 2");
            }
            this.translator = new DataPointTranslator();
            this.handlerCount = Integer.parseInt(map.getOrDefault("grpc.disruptor.handler.count", "2"));
            this.es = Executors.newFixedThreadPool(this.handlerCount + 2, new BackgrounThreadFactory("grpc-writers"));
            this.disruptor = new Disruptor<>(new DPWrapperFactory(), parseInt, this.es);
            EventHandler<? super DPWrapper>[] eventHandlerArr = new EventHandler[this.handlerCount];
            for (int i = 0; i < this.handlerCount; i++) {
                eventHandlerArr[i] = new WriteHandler(storageEngine, this.handlerCount, i);
            }
            this.disruptor.handleEventsWith(new HashHandler()).then(eventHandlerArr);
            this.buffer = this.disruptor.start();
        }
    }

    @Override // com.srotya.sidewinder.core.rpc.WriterServiceGrpc.WriterServiceImplBase
    public void writeSingleDataPoint(SingleData singleData, StreamObserver<Ack> streamObserver) {
        Ack build;
        Point point = singleData.getPoint();
        try {
            if (this.disruptorEnable) {
                this.buffer.publishEvent(this.translator, point, Long.valueOf(point.getTimestamp()), null);
            } else if (point.getFp()) {
                this.engine.writeDataPoint(point.getDbName(), point.getMeasurementName(), point.getValueFieldName(), (List<String>) new ArrayList(point.getTagsList()), point.getTimestamp(), Double.doubleToLongBits(point.getValue()));
            } else {
                this.engine.writeDataPoint(point.getDbName(), point.getMeasurementName(), point.getValueFieldName(), (List<String>) new ArrayList(point.getTagsList()), point.getTimestamp(), point.getValue());
            }
            build = Ack.newBuilder().setMessageId(singleData.getMessageId()).setResponseCode(200).build();
        } catch (Exception e) {
            build = Ack.newBuilder().setMessageId(singleData.getMessageId()).setResponseCode(500).build();
        }
        streamObserver.onNext(build);
        streamObserver.onCompleted();
    }

    @Override // com.srotya.sidewinder.core.rpc.WriterServiceGrpc.WriterServiceImplBase
    public void writeBatchDataPoint(BatchData batchData, StreamObserver<Ack> streamObserver) {
        Ack build;
        try {
            List<Point> pointsList = batchData.getPointsList();
            for (int i = 0; i < pointsList.size(); i++) {
                Point point = pointsList.get(i);
                if (this.disruptorEnable) {
                    this.buffer.publishEvent(this.translator, point, Long.valueOf(point.getTimestamp()), null);
                } else if (point.getFp()) {
                    this.engine.writeDataPoint(point.getDbName(), point.getMeasurementName(), point.getValueFieldName(), new ArrayList(point.getTagsList()), point.getTimestamp(), Double.longBitsToDouble(point.getValue()));
                } else {
                    this.engine.writeDataPoint(point.getDbName(), point.getMeasurementName(), point.getValueFieldName(), (List<String>) new ArrayList(point.getTagsList()), point.getTimestamp(), point.getValue());
                }
            }
            build = Ack.newBuilder().setMessageId(batchData.getMessageId()).setResponseCode(200).build();
        } catch (Exception e) {
            e.printStackTrace();
            build = Ack.newBuilder().setMessageId(batchData.getMessageId()).setResponseCode(500).build();
        }
        streamObserver.onNext(build);
        streamObserver.onCompleted();
    }

    @Override // com.srotya.sidewinder.core.rpc.WriterServiceGrpc.WriterServiceImplBase
    public void writeSeriesPoint(RawTimeSeriesBucket rawTimeSeriesBucket, StreamObserver<Ack> streamObserver) {
        Ack build;
        try {
            TimeSeries orCreateTimeSeries = this.engine.getOrCreateTimeSeries(rawTimeSeriesBucket.getDbName(), rawTimeSeriesBucket.getMeasurementName(), rawTimeSeriesBucket.getValueFieldName(), new ArrayList(rawTimeSeriesBucket.getTagsList()), rawTimeSeriesBucket.getBucketSize(), rawTimeSeriesBucket.getFp());
            for (Bucket bucket : rawTimeSeriesBucket.getBucketsList()) {
                Writer orCreateSeriesBucket = orCreateTimeSeries.getOrCreateSeriesBucket(TimeUnit.MILLISECONDS, bucket.getHeaderTimestamp());
                orCreateSeriesBucket.configure(this.conf, null, false, 1, true);
                orCreateSeriesBucket.setCounter(bucket.getCount());
                orCreateSeriesBucket.bootstrap(bucket.getData().asReadOnlyByteBuffer());
            }
            build = Ack.newBuilder().setMessageId(rawTimeSeriesBucket.getMessageId()).setResponseCode(200).build();
        } catch (Exception e) {
            build = Ack.newBuilder().setMessageId(rawTimeSeriesBucket.getMessageId()).setResponseCode(500).build();
        }
        streamObserver.onNext(build);
        streamObserver.onCompleted();
    }

    @Override // com.srotya.sidewinder.core.rpc.WriterServiceGrpc.WriterServiceImplBase
    public StreamObserver<SingleData> writeDataPointStream(StreamObserver<Ack> streamObserver) {
        return new WriteStreamObserver(this.buffer, streamObserver);
    }

    public Disruptor<DPWrapper> getDisruptor() {
        return this.disruptor;
    }

    public ExecutorService getEs() {
        return this.es;
    }
}
