package org.apache.samza.container;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.samza.SamzaException;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.task.AsyncRunLoop;
import org.apache.samza.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
import scala.collection.immutable.Map;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:org/apache/samza/container/RunLoopFactory.class */
public class RunLoopFactory {
    private static final Logger log = LoggerFactory.getLogger(RunLoopFactory.class);
    private static final long DEFAULT_WINDOW_MS = -1;
    private static final long DEFAULT_COMMIT_MS = 60000;
    private static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1;

    public static Runnable createRunLoop(Map<TaskName, TaskInstance<?>> map, SystemConsumers systemConsumers, ExecutorService executorService, Executor executor, SamzaContainerMetrics samzaContainerMetrics, TaskConfig taskConfig) {
        long longValue = ((Long) taskConfig.getWindowMs().getOrElse(Utils.defaultValue(-1L))).longValue();
        log.info("Got window milliseconds: " + longValue);
        long longValue2 = ((Long) taskConfig.getCommitMs().getOrElse(Utils.defaultValue(Long.valueOf(DEFAULT_COMMIT_MS)))).longValue();
        log.info("Got commit milliseconds: " + longValue2);
        int count = map.values().count(new AbstractFunction1<TaskInstance<?>, Object>() { // from class: org.apache.samza.container.RunLoopFactory.1
            public Boolean apply(TaskInstance<?> taskInstance) {
                return Boolean.valueOf(taskInstance.isAsyncTask());
            }
        });
        if (count > 0 && count < map.size()) {
            throw new SamzaException("Mixing StreamTask and AsyncStreamTask is not supported");
        }
        if (count == 0) {
            log.info("Run loop in single thread mode.");
            return new RunLoop(map, systemConsumers, samzaContainerMetrics, longValue, longValue2, Utils.defaultClock(), executor);
        }
        Integer num = (Integer) taskConfig.getMaxConcurrency().getOrElse(Utils.defaultValue(1));
        log.info("Got max messages in flight: " + num);
        Long l = (Long) taskConfig.getCallbackTimeoutMs().getOrElse(Utils.defaultValue(-1L));
        log.info("Got callback timeout: " + l);
        log.info("Run loop in asynchronous mode.");
        return new AsyncRunLoop(JavaConversions.asJavaMap(map), executorService, systemConsumers, num.intValue(), longValue, longValue2, l.longValue(), samzaContainerMetrics);
    }
}
