package io.ray.streaming.operator.chain;

import com.google.common.base.Preconditions;
import io.ray.streaming.api.Language;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.context.RuntimeContext;
import io.ray.streaming.api.function.Function;
import io.ray.streaming.api.function.impl.SourceFunction;
import io.ray.streaming.message.Record;
import io.ray.streaming.operator.OneInputOperator;
import io.ray.streaming.operator.Operator;
import io.ray.streaming.operator.OperatorType;
import io.ray.streaming.operator.SourceOperator;
import io.ray.streaming.operator.StreamOperator;
import io.ray.streaming.operator.TwoInputOperator;
import java.io.Serializable;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/* loaded from: input_file:io/ray/streaming/operator/chain/ChainedOperator.class */
public abstract class ChainedOperator extends StreamOperator<Function> {
    protected final List<StreamOperator> operators;
    protected final Operator headOperator;
    protected final Operator tailOperator;
    private final List<Map<String, String>> configs;
    private static volatile /* synthetic */ int[] $SWITCH_TABLE$io$ray$streaming$operator$OperatorType;

    /* loaded from: input_file:io/ray/streaming/operator/chain/ChainedOperator$ChainedOneInputOperator.class */
    static class ChainedOneInputOperator<T> extends ChainedOperator implements OneInputOperator<T> {
        private final OneInputOperator<T> inputOperator;

        ChainedOneInputOperator(List<StreamOperator> list, List<Map<String, String>> list2) {
            super(list, list2);
            this.inputOperator = (OneInputOperator) this.headOperator;
        }

        @Override // io.ray.streaming.operator.OneInputOperator
        public void processElement(Record<T> record) throws Exception {
            this.inputOperator.processElement(record);
        }
    }

    /* loaded from: input_file:io/ray/streaming/operator/chain/ChainedOperator$ChainedSourceOperator.class */
    static class ChainedSourceOperator<T> extends ChainedOperator implements SourceOperator<T> {
        private final SourceOperator<T> sourceOperator;

        ChainedSourceOperator(List<StreamOperator> list, List<Map<String, String>> list2) {
            super(list, list2);
            this.sourceOperator = (SourceOperator) this.headOperator;
        }

        @Override // io.ray.streaming.operator.SourceOperator
        public void fetch() {
            this.sourceOperator.fetch();
        }

        @Override // io.ray.streaming.operator.SourceOperator
        public SourceFunction.SourceContext<T> getSourceContext() {
            return this.sourceOperator.getSourceContext();
        }
    }

    /* loaded from: input_file:io/ray/streaming/operator/chain/ChainedOperator$ChainedTwoInputOperator.class */
    static class ChainedTwoInputOperator<L, R> extends ChainedOperator implements TwoInputOperator<L, R> {
        private final TwoInputOperator<L, R> inputOperator;

        ChainedTwoInputOperator(List<StreamOperator> list, List<Map<String, String>> list2) {
            super(list, list2);
            this.inputOperator = (TwoInputOperator) this.headOperator;
        }

        @Override // io.ray.streaming.operator.TwoInputOperator
        public void processElement(Record<L> record, Record<R> record2) {
            this.inputOperator.processElement(record, record2);
        }
    }

    public ChainedOperator(List<StreamOperator> list, List<Map<String, String>> list2) {
        Preconditions.checkArgument(list.size() >= 2, "Need at lease two operators to be chained together");
        list.stream().skip(1L).forEach(streamOperator -> {
            Preconditions.checkArgument(streamOperator instanceof OneInputOperator);
        });
        this.operators = list;
        this.configs = list2;
        this.headOperator = list.get(0);
        this.tailOperator = list.get(list.size() - 1);
    }

    @Override // io.ray.streaming.operator.StreamOperator, io.ray.streaming.operator.Operator
    public void open(List<Collector> list, RuntimeContext runtimeContext) {
        List list2 = (List) this.operators.stream().skip(1L).map(streamOperator -> {
            return new ForwardCollector((OneInputOperator) streamOperator);
        }).collect(Collectors.toList());
        for (int i = 0; i < this.operators.size() - 1; i++) {
            this.operators.get(i).open(Collections.singletonList((ForwardCollector) list2.get(i)), createRuntimeContext(runtimeContext, i));
        }
        this.tailOperator.open(list, createRuntimeContext(runtimeContext, this.operators.size() - 1));
    }

    @Override // io.ray.streaming.operator.Operator
    public OperatorType getOpType() {
        return this.headOperator.getOpType();
    }

    @Override // io.ray.streaming.operator.StreamOperator, io.ray.streaming.operator.Operator
    public Language getLanguage() {
        return this.headOperator.getLanguage();
    }

    @Override // io.ray.streaming.operator.StreamOperator, io.ray.streaming.operator.Operator
    public String getName() {
        return (String) this.operators.stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.joining(" -> ", "[", "]"));
    }

    public List<StreamOperator> getOperators() {
        return this.operators;
    }

    public Operator getHeadOperator() {
        return this.headOperator;
    }

    public Operator getTailOperator() {
        return this.tailOperator;
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Object[], java.io.Serializable] */
    @Override // io.ray.streaming.operator.StreamOperator, io.ray.streaming.operator.Operator
    public Serializable saveCheckpoint() {
        ?? r0 = new Object[this.operators.size()];
        for (int i = 0; i < this.operators.size(); i++) {
            r0[i] = this.operators.get(i).saveCheckpoint();
        }
        return r0;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.ray.streaming.operator.StreamOperator, io.ray.streaming.operator.Operator
    public void loadCheckpoint(Serializable serializable) {
        Serializable[] serializableArr = (Serializable[]) serializable;
        for (int i = 0; i < this.operators.size(); i++) {
            this.operators.get(i).loadCheckpoint(serializableArr[i]);
        }
    }

    private RuntimeContext createRuntimeContext(RuntimeContext runtimeContext, int i) {
        return (RuntimeContext) Proxy.newProxyInstance(runtimeContext.getClass().getClassLoader(), new Class[]{RuntimeContext.class}, (obj, method, objArr) -> {
            return method.getName().equals("getConfig") ? this.configs.get(i) : method.invoke(runtimeContext, objArr);
        });
    }

    public static ChainedOperator newChainedOperator(List<StreamOperator> list, List<Map<String, String>> list2) {
        switch ($SWITCH_TABLE$io$ray$streaming$operator$OperatorType()[list.get(0).getOpType().ordinal()]) {
            case 1:
                return new ChainedSourceOperator(list, list2);
            case 2:
                return new ChainedOneInputOperator(list, list2);
            case 3:
                return new ChainedTwoInputOperator(list, list2);
            default:
                throw new IllegalArgumentException("Unsupported operator type " + list.get(0).getOpType());
        }
    }

    static /* synthetic */ int[] $SWITCH_TABLE$io$ray$streaming$operator$OperatorType() {
        int[] iArr = $SWITCH_TABLE$io$ray$streaming$operator$OperatorType;
        if (iArr != null) {
            return iArr;
        }
        int[] iArr2 = new int[OperatorType.valuesCustom().length];
        try {
            iArr2[OperatorType.ONE_INPUT.ordinal()] = 2;
        } catch (NoSuchFieldError unused) {
        }
        try {
            iArr2[OperatorType.SOURCE.ordinal()] = 1;
        } catch (NoSuchFieldError unused2) {
        }
        try {
            iArr2[OperatorType.TWO_INPUT.ordinal()] = 3;
        } catch (NoSuchFieldError unused3) {
        }
        $SWITCH_TABLE$io$ray$streaming$operator$OperatorType = iArr2;
        return iArr2;
    }
}
