package io.ray.streaming.operator.impl;

import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.context.RuntimeContext;
import io.ray.streaming.api.function.impl.SourceFunction;
import io.ray.streaming.message.Record;
import io.ray.streaming.operator.ChainStrategy;
import io.ray.streaming.operator.OperatorType;
import io.ray.streaming.operator.SourceOperator;
import io.ray.streaming.operator.StreamOperator;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:io/ray/streaming/operator/impl/SourceOperatorImpl.class */
public class SourceOperatorImpl<T> extends StreamOperator<SourceFunction<T>> implements SourceOperator {
    private SourceOperatorImpl<T>.SourceContextImpl sourceContext;

    /* loaded from: input_file:io/ray/streaming/operator/impl/SourceOperatorImpl$SourceContextImpl.class */
    class SourceContextImpl implements SourceFunction.SourceContext<T> {
        private List<Collector> collectors;

        public SourceContextImpl(List<Collector> list) {
            this.collectors = list;
        }

        @Override // io.ray.streaming.api.function.impl.SourceFunction.SourceContext
        public void collect(T t) throws Exception {
            Iterator<Collector> it = this.collectors.iterator();
            while (it.hasNext()) {
                it.next().collect(new Record(t));
            }
        }
    }

    public SourceOperatorImpl(SourceFunction<T> sourceFunction) {
        super(sourceFunction);
        setChainStrategy(ChainStrategy.HEAD);
    }

    @Override // io.ray.streaming.operator.StreamOperator, io.ray.streaming.operator.Operator
    public void open(List<Collector> list, RuntimeContext runtimeContext) {
        super.open(list, runtimeContext);
        this.sourceContext = new SourceContextImpl(list);
        ((SourceFunction) this.function).init(runtimeContext.getParallelism(), runtimeContext.getTaskIndex());
    }

    @Override // io.ray.streaming.operator.SourceOperator
    public void fetch() {
        try {
            ((SourceFunction) this.function).fetch(this.sourceContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // io.ray.streaming.operator.SourceOperator
    public SourceFunction.SourceContext getSourceContext() {
        return this.sourceContext;
    }

    @Override // io.ray.streaming.operator.Operator
    public OperatorType getOpType() {
        return OperatorType.SOURCE;
    }
}
