package io.ray.streaming.api.context;

import com.google.common.base.Preconditions;
import io.ray.api.Ray;
import io.ray.streaming.api.stream.StreamSink;
import io.ray.streaming.client.JobClient;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.jobgraph.JobGraphBuilder;
import io.ray.streaming.jobgraph.JobGraphOptimizer;
import io.ray.streaming.util.Config;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/api/context/StreamingContext.class */
public class StreamingContext implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingContext.class);
    private transient AtomicInteger idGenerator = new AtomicInteger(0);
    private List<StreamSink> streamSinks = new ArrayList();
    private Map<String, String> jobConfig = new HashMap();
    private JobGraph jobGraph;

    private StreamingContext() {
    }

    public static StreamingContext buildContext() {
        return new StreamingContext();
    }

    public void execute(String str) {
        this.jobGraph = new JobGraphOptimizer(new JobGraphBuilder(this.streamSinks, str).build()).optimize();
        this.jobGraph.printJobGraph();
        LOG.info("JobGraph digraph\n{}", this.jobGraph.generateDigraph());
        if (Ray.isInitialized()) {
            LOG.info("Reuse existing cluster.");
        } else {
            if (Config.MEMORY_CHANNEL.equalsIgnoreCase(this.jobConfig.get(Config.CHANNEL_TYPE))) {
                ClusterStarter.startCluster(true);
                LOG.info("Created local cluster for job {}.", str);
            } else {
                ClusterStarter.startCluster(false);
                LOG.info("Created multi process cluster for job {}.", str);
            }
            Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
        }
        Iterator it = ServiceLoader.load(JobClient.class).iterator();
        Preconditions.checkArgument(it.hasNext(), "No JobClient implementation has been provided.");
        ((JobClient) it.next()).submit(this.jobGraph, this.jobConfig);
    }

    public int generateId() {
        return this.idGenerator.incrementAndGet();
    }

    public void addSink(StreamSink streamSink) {
        this.streamSinks.add(streamSink);
    }

    public List<StreamSink> getStreamSinks() {
        return this.streamSinks;
    }

    public void withConfig(Map<String, String> map) {
        this.jobConfig = map;
    }

    public void stop() {
        if (Ray.isInitialized()) {
            ClusterStarter.stopCluster();
        }
    }
}
