package io.ray.streaming.jobgraph;

import io.ray.streaming.api.Language;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.api.partition.impl.ForwardPartition;
import io.ray.streaming.api.partition.impl.RoundRobinPartition;
import io.ray.streaming.operator.ChainStrategy;
import io.ray.streaming.operator.chain.ChainedOperator;
import io.ray.streaming.python.PythonOperator;
import io.ray.streaming.python.PythonPartition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

/* loaded from: input_file:io/ray/streaming/jobgraph/JobGraphOptimizer.class */
public class JobGraphOptimizer {
    private final JobGraph jobGraph;
    private Map<Integer, JobVertex> vertexMap;
    private Map<JobVertex, Set<JobEdge>> outputEdgesMap;
    private Set<JobVertex> visited = new HashSet();
    private Map<Integer, Pair<JobVertex, List<JobVertex>>> mergedVertexMap = new HashMap();

    public JobGraphOptimizer(JobGraph jobGraph) {
        this.jobGraph = jobGraph;
        this.vertexMap = (Map) jobGraph.getJobVertices().stream().collect(Collectors.toMap((v0) -> {
            return v0.getVertexId();
        }, Function.identity()));
        this.outputEdgesMap = (Map) this.vertexMap.keySet().stream().collect(Collectors.toMap(num -> {
            return this.vertexMap.get(num);
        }, num2 -> {
            return new HashSet(jobGraph.getVertexOutputEdges(num2.intValue()));
        }));
    }

    public JobGraph optimize() {
        this.jobGraph.getSourceVertices().forEach(jobVertex -> {
            ArrayList arrayList = new ArrayList();
            arrayList.add(jobVertex);
            mergeVerticesRecursively(jobVertex, arrayList);
        });
        return new JobGraph(this.jobGraph.getJobName(), this.jobGraph.getJobConfig(), (List) this.mergedVertexMap.values().stream().map((v0) -> {
            return v0.getLeft();
        }).collect(Collectors.toList()), createEdges());
    }

    private void mergeVerticesRecursively(JobVertex jobVertex, List<JobVertex> list) {
        if (this.visited.contains(jobVertex)) {
            return;
        }
        this.visited.add(jobVertex);
        Set<JobEdge> set = this.outputEdgesMap.get(jobVertex);
        if (set.isEmpty()) {
            mergeAndAddVertex(list);
        } else {
            set.forEach(jobEdge -> {
                JobVertex jobVertex2 = this.vertexMap.get(Integer.valueOf(jobEdge.getTargetVertexId()));
                if (canBeChained(jobVertex, jobVertex2, jobEdge)) {
                    list.add(jobVertex2);
                    mergeVerticesRecursively(jobVertex2, list);
                } else {
                    mergeAndAddVertex(list);
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(jobVertex2);
                    mergeVerticesRecursively(jobVertex2, arrayList);
                }
            });
        }
    }

    private void mergeAndAddVertex(List<JobVertex> list) {
        JobVertex jobVertex;
        JobVertex jobVertex2 = list.get(0);
        Language language = jobVertex2.getLanguage();
        if (list.size() == 1) {
            jobVertex = jobVertex2;
        } else {
            List list2 = (List) list.stream().map(jobVertex3 -> {
                return this.vertexMap.get(Integer.valueOf(jobVertex3.getVertexId())).getStreamOperator();
            }).collect(Collectors.toList());
            List list3 = (List) list.stream().map(jobVertex4 -> {
                return this.vertexMap.get(Integer.valueOf(jobVertex4.getVertexId())).getConfig();
            }).collect(Collectors.toList());
            jobVertex = new JobVertex(jobVertex2.getVertexId(), jobVertex2.getParallelism(), jobVertex2.getVertexType(), language == Language.JAVA ? ChainedOperator.newChainedOperator(list2, list3) : new PythonOperator.ChainedPythonOperator((List) list2.stream().map(streamOperator -> {
                return (PythonOperator) streamOperator;
            }).collect(Collectors.toList()), list3), new HashMap());
        }
        this.mergedVertexMap.put(Integer.valueOf(jobVertex.getVertexId()), Pair.of(jobVertex, list));
    }

    private List<JobEdge> createEdges() {
        ArrayList arrayList = new ArrayList();
        this.mergedVertexMap.forEach((num, pair) -> {
            JobVertex jobVertex = (JobVertex) pair.getLeft();
            List list = (List) pair.getRight();
            JobVertex jobVertex2 = (JobVertex) list.get(list.size() - 1);
            if (this.outputEdgesMap.containsKey(jobVertex2)) {
                this.outputEdgesMap.get(jobVertex2).forEach(jobEdge -> {
                    Pair<JobVertex, List<JobVertex>> pair = this.mergedVertexMap.get(Integer.valueOf(jobEdge.getTargetVertexId()));
                    arrayList.add(new JobEdge(jobVertex.getVertexId(), ((JobVertex) pair.getLeft()).getVertexId(), changePartition(jobEdge.getPartition())));
                });
            }
        });
        return arrayList;
    }

    private Partition changePartition(Partition partition) {
        if (!(partition instanceof PythonPartition)) {
            return partition instanceof ForwardPartition ? new RoundRobinPartition() : partition;
        }
        PythonPartition pythonPartition = (PythonPartition) partition;
        return (pythonPartition.isConstructedFromBinary() || !pythonPartition.getFunctionName().equals(PythonPartition.FORWARD_PARTITION_CLASS)) ? partition : PythonPartition.RoundRobinPartition;
    }

    private boolean canBeChained(JobVertex jobVertex, JobVertex jobVertex2, JobEdge jobEdge) {
        if (this.jobGraph.getVertexOutputEdges(jobVertex.getVertexId()).size() > 1 || this.jobGraph.getVertexInputEdges(jobVertex2.getVertexId()).size() > 1 || jobVertex.getParallelism() != jobVertex2.getParallelism() || jobVertex.getStreamOperator().getChainStrategy() == ChainStrategy.NEVER || jobVertex2.getStreamOperator().getChainStrategy() == ChainStrategy.NEVER || jobVertex2.getStreamOperator().getChainStrategy() == ChainStrategy.HEAD || jobVertex.getLanguage() != jobVertex2.getLanguage()) {
            return false;
        }
        Partition partition = jobEdge.getPartition();
        if (!(partition instanceof PythonPartition)) {
            return partition instanceof ForwardPartition;
        }
        PythonPartition pythonPartition = (PythonPartition) partition;
        return !pythonPartition.isConstructedFromBinary() && pythonPartition.getFunctionName().equals(PythonPartition.FORWARD_PARTITION_CLASS);
    }
}
