package io.ray.streaming.jobgraph;

import com.google.common.base.Preconditions;
import io.ray.streaming.api.stream.DataStream;
import io.ray.streaming.api.stream.JoinStream;
import io.ray.streaming.api.stream.Stream;
import io.ray.streaming.api.stream.StreamSink;
import io.ray.streaming.api.stream.StreamSource;
import io.ray.streaming.api.stream.UnionStream;
import io.ray.streaming.operator.StreamOperator;
import io.ray.streaming.python.stream.PythonDataStream;
import io.ray.streaming.python.stream.PythonUnionStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/jobgraph/JobGraphBuilder.class */
public class JobGraphBuilder {
    private static final Logger LOG = LoggerFactory.getLogger(JobGraphBuilder.class);
    private JobGraph jobGraph;
    private AtomicInteger edgeIdGenerator;
    private List<StreamSink> streamSinkList;

    public JobGraphBuilder(List<StreamSink> list) {
        this(list, "job_" + System.currentTimeMillis());
    }

    public JobGraphBuilder(List<StreamSink> list, String str) {
        this(list, str, new HashMap());
    }

    public JobGraphBuilder(List<StreamSink> list, String str, Map<String, String> map) {
        this.jobGraph = new JobGraph(str, map);
        this.streamSinkList = list;
        this.edgeIdGenerator = new AtomicInteger(0);
    }

    public JobGraph build() {
        Iterator<StreamSink> it = this.streamSinkList.iterator();
        while (it.hasNext()) {
            processStream(it.next());
        }
        return this.jobGraph;
    }

    private void processStream(Stream stream) {
        JobVertex jobVertex;
        while (stream.isProxyStream()) {
            LOG.debug("Skip proxy stream {} of id {}", stream, Integer.valueOf(stream.getId()));
            stream = stream.getOriginalStream();
        }
        StreamOperator operator = stream.getOperator();
        Preconditions.checkArgument(stream.getLanguage() == operator.getLanguage(), "Reference stream should be skipped.");
        int id = stream.getId();
        int parallelism = stream.getParallelism();
        Map<String, String> config = stream.getConfig();
        if (stream instanceof StreamSink) {
            jobVertex = new JobVertex(id, parallelism, VertexType.SINK, operator, config);
            Stream inputStream = stream.getInputStream();
            this.jobGraph.addEdge(new JobEdge(inputStream.getId(), id, inputStream.getPartition()));
            processStream(inputStream);
        } else if (stream instanceof StreamSource) {
            jobVertex = new JobVertex(id, parallelism, VertexType.SOURCE, operator, config);
        } else {
            if (!(stream instanceof DataStream) && !(stream instanceof PythonDataStream)) {
                throw new UnsupportedOperationException("Unsupported stream: " + stream);
            }
            jobVertex = new JobVertex(id, parallelism, VertexType.TRANSFORMATION, operator, config);
            Stream inputStream2 = stream.getInputStream();
            this.jobGraph.addEdge(new JobEdge(inputStream2.getId(), id, inputStream2.getPartition()));
            processStream(inputStream2);
            ArrayList<Stream> arrayList = new ArrayList();
            if (stream instanceof UnionStream) {
                arrayList.addAll(((UnionStream) stream).getUnionStreams());
            }
            if (stream instanceof PythonUnionStream) {
                arrayList.addAll(((PythonUnionStream) stream).getUnionStreams());
            }
            for (Stream stream2 : arrayList) {
                this.jobGraph.addEdge(new JobEdge(stream2.getId(), id, stream2.getPartition()));
                processStream(stream2);
            }
            if (stream instanceof JoinStream) {
                DataStream rightStream = ((JoinStream) stream).getRightStream();
                this.jobGraph.addEdge(new JobEdge(rightStream.getId(), id, rightStream.getPartition()));
                processStream(rightStream);
            }
        }
        this.jobGraph.addVertex(jobVertex);
    }

    private int getEdgeId() {
        return this.edgeIdGenerator.incrementAndGet();
    }
}
