package org.apache.samza.container;

import java.io.File;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.config.Config;
import org.apache.samza.config.ShellCommandConfig$;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.storage.TaskStorageManager;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemProducers;
import org.apache.samza.task.AsyncStreamTaskAdapter;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.util.Util$;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.AbstractFunction1;
import scala.runtime.ObjectRef;

/* compiled from: SamzaContainer.scala */
/* loaded from: input_file:org/apache/samza/container/SamzaContainer$$anonfun$34.class */
public class SamzaContainer$$anonfun$34 extends AbstractFunction1<TaskModel, Tuple2<TaskName, TaskInstance<Object>>> implements Serializable {
    public static final long serialVersionUID = 0;
    private final JobModel jobModel$1;
    public final Config config$2;
    public final SamzaContainerMetrics samzaContainerMetrics$1;
    public final Map systemFactories$1;
    private final Map systemAdmins$1;
    private final StreamMetadataCache streamMetadataCache$1;
    private final String taskClassName$1;
    private final boolean isAsyncTask$1;
    public final Map serdes$1;
    public final Map changeLogSystemStreams$1;
    private final Map reporters$1;
    private final OffsetManager offsetManager$1;
    private final SystemConsumers consumerMultiplexer$1;
    private final SystemProducers producerMultiplexer$1;
    private final Map storageEngineFactories$1;
    private final boolean singleThreadMode$1;
    private final ExecutorService taskThreadPool$1;
    public final SamzaContainerContext containerContext$1;
    public final File defaultStoreBaseDir$1;
    private final HashSet storeWatchPaths$1;

    public final Tuple2<TaskName, TaskInstance<Object>> apply(TaskModel taskModel) {
        SamzaContainer$.MODULE$.debug(new SamzaContainer$$anonfun$34$$anonfun$apply$41(this, taskModel));
        TaskName taskName = taskModel.getTaskName();
        Object newInstance = Class.forName(this.taskClassName$1).newInstance();
        Object asyncStreamTaskAdapter = (this.singleThreadMode$1 || this.isAsyncTask$1) ? newInstance : new AsyncStreamTaskAdapter((StreamTask) newInstance, this.taskThreadPool$1);
        TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics(new StringOps(Predef$.MODULE$.augmentString("TaskName-%s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{taskName})), TaskInstanceMetrics$.MODULE$.$lessinit$greater$default$2());
        TaskInstanceCollector taskInstanceCollector = new TaskInstanceCollector(this.producerMultiplexer$1, taskInstanceMetrics);
        Map map = ((Map) this.changeLogSystemStreams$1.map(new SamzaContainer$$anonfun$34$$anonfun$35(this, taskName, taskInstanceMetrics), Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.conforms());
        SamzaContainer$.MODULE$.info(new SamzaContainer$$anonfun$34$$anonfun$apply$42(this, map));
        ObjectRef objectRef = new ObjectRef((Object) null);
        if (System.getenv(ShellCommandConfig$.MODULE$.ENV_LOGGED_STORE_BASE_DIR()) == null) {
            SamzaContainer$.MODULE$.warn(new SamzaContainer$$anonfun$34$$anonfun$apply$43(this));
            objectRef.elem = this.defaultStoreBaseDir$1;
        } else {
            Tuple2<String, String> jobNameAndId = Util$.MODULE$.getJobNameAndId(this.config$2);
            objectRef.elem = new File(new StringBuilder().append(System.getenv(ShellCommandConfig$.MODULE$.ENV_LOGGED_STORE_BASE_DIR())).append(File.separator).append(jobNameAndId._1()).append("-").append(jobNameAndId._2()).toString());
        }
        this.storeWatchPaths$1.add(((File) objectRef.elem).toPath());
        SamzaContainer$.MODULE$.info(new SamzaContainer$$anonfun$34$$anonfun$apply$44(this, objectRef));
        Map map2 = (Map) this.storageEngineFactories$1.map(new SamzaContainer$$anonfun$34$$anonfun$37(this, taskModel, taskName, taskInstanceMetrics, taskInstanceCollector, objectRef), Map$.MODULE$.canBuildFrom());
        SamzaContainer$.MODULE$.info(new SamzaContainer$$anonfun$34$$anonfun$apply$45(this, map2));
        TaskStorageManager taskStorageManager = new TaskStorageManager(taskName, map2, map, this.changeLogSystemStreams$1, this.jobModel$1.maxChangeLogStreamPartitions, this.streamMetadataCache$1, this.defaultStoreBaseDir$1, (File) objectRef.elem, taskModel.getChangelogPartition(), this.systemAdmins$1);
        Set set = JavaConversions$.MODULE$.asScalaSet(taskModel.getSystemStreamPartitions()).toSet();
        SamzaContainer$.MODULE$.info(new SamzaContainer$$anonfun$34$$anonfun$apply$46(this, taskName, set));
        return new Tuple2<>(taskName, createTaskInstance$1(asyncStreamTaskAdapter, taskName, taskInstanceMetrics, taskInstanceCollector, taskStorageManager, set));
    }

    private final TaskInstance createTaskInstance$1(Object obj, TaskName taskName, TaskInstanceMetrics taskInstanceMetrics, TaskInstanceCollector taskInstanceCollector, TaskStorageManager taskStorageManager, Set set) {
        return new TaskInstance(obj, taskName, this.config$2, taskInstanceMetrics, this.systemAdmins$1, this.consumerMultiplexer$1, taskInstanceCollector, this.containerContext$1, this.offsetManager$1, taskStorageManager, this.reporters$1, set, TaskInstanceExceptionHandler$.MODULE$.apply(taskInstanceMetrics, this.config$2));
    }

    public SamzaContainer$$anonfun$34(JobModel jobModel, Config config, SamzaContainerMetrics samzaContainerMetrics, Map map, Map map2, StreamMetadataCache streamMetadataCache, String str, boolean z, Map map3, Map map4, Map map5, OffsetManager offsetManager, SystemConsumers systemConsumers, SystemProducers systemProducers, Map map6, boolean z2, ExecutorService executorService, SamzaContainerContext samzaContainerContext, File file, HashSet hashSet) {
        this.jobModel$1 = jobModel;
        this.config$2 = config;
        this.samzaContainerMetrics$1 = samzaContainerMetrics;
        this.systemFactories$1 = map;
        this.systemAdmins$1 = map2;
        this.streamMetadataCache$1 = streamMetadataCache;
        this.taskClassName$1 = str;
        this.isAsyncTask$1 = z;
        this.serdes$1 = map3;
        this.changeLogSystemStreams$1 = map4;
        this.reporters$1 = map5;
        this.offsetManager$1 = offsetManager;
        this.consumerMultiplexer$1 = systemConsumers;
        this.producerMultiplexer$1 = systemProducers;
        this.storageEngineFactories$1 = map6;
        this.singleThreadMode$1 = z2;
        this.taskThreadPool$1 = executorService;
        this.containerContext$1 = samzaContainerContext;
        this.defaultStoreBaseDir$1 = file;
        this.storeWatchPaths$1 = hashSet;
    }
}
