package org.apache.samza.job.yarn;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.YarnConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/samza/job/yarn/SamzaTaskManager.class */
public class SamzaTaskManager implements YarnAppMasterListener {
    private static final Logger log = LoggerFactory.getLogger(SamzaTaskManager.class);
    private final boolean hostAffinityEnabled;
    private final SamzaAppState state;
    private final JobConfig jobConfig;
    private final YarnConfig yarnConfig;
    private final AbstractContainerAllocator containerAllocator;
    private final Thread allocatorThread;
    private boolean tooManyFailedContainers = false;
    private Map<Integer, ContainerFailure> containerFailures = new HashMap();

    public SamzaTaskManager(Config config, SamzaAppState samzaAppState, AMRMClientAsync<AMRMClient.ContainerRequest> aMRMClientAsync, YarnConfiguration yarnConfiguration) {
        this.state = samzaAppState;
        this.jobConfig = new JobConfig(config);
        this.yarnConfig = new YarnConfig(config);
        this.hostAffinityEnabled = this.yarnConfig.getHostAffinityEnabled();
        if (this.hostAffinityEnabled) {
            this.containerAllocator = new HostAwareContainerAllocator(aMRMClientAsync, new ContainerUtil(config, samzaAppState, yarnConfiguration), this.yarnConfig);
        } else {
            this.containerAllocator = new ContainerAllocator(aMRMClientAsync, new ContainerUtil(config, samzaAppState, yarnConfiguration), this.yarnConfig);
        }
        this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
    }

    @Override // org.apache.samza.job.yarn.YarnAppMasterListener
    public boolean shouldShutdown() {
        return this.tooManyFailedContainers || this.state.completedContainers.get() == this.state.containerCount || !this.allocatorThread.isAlive();
    }

    @Override // org.apache.samza.job.yarn.YarnAppMasterListener
    public void onInit() {
        this.state.containerCount = this.jobConfig.getContainerCount();
        this.state.neededContainers.set(this.state.containerCount);
        this.containerAllocator.requestContainers(this.state.jobCoordinator.jobModel().getAllContainerLocality());
        log.info("Starting the container allocator thread");
        this.allocatorThread.start();
    }

    @Override // org.apache.samza.job.yarn.YarnAppMasterListener
    public void onReboot() {
    }

    @Override // org.apache.samza.job.yarn.YarnAppMasterListener
    public void onShutdown() {
        this.containerAllocator.setIsRunning(false);
        try {
            this.allocatorThread.join();
        } catch (InterruptedException e) {
            log.info("Allocator Thread join() threw an interrupted exception", e);
        }
    }

    @Override // org.apache.samza.job.yarn.YarnAppMasterListener
    public void onContainerAllocated(Container container) {
        this.containerAllocator.addContainer(container);
    }

    @Override // org.apache.samza.job.yarn.YarnAppMasterListener
    public void onContainerCompleted(ContainerStatus containerStatus) {
        int i;
        long j;
        String converterUtils = ConverterUtils.toString(containerStatus.getContainerId());
        int i2 = -1;
        Iterator<Map.Entry<Integer, YarnContainer>> it = this.state.runningContainers.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<Integer, YarnContainer> next = it.next();
            if (next.getValue().id().equals(containerStatus.getContainerId())) {
                i2 = next.getKey().intValue();
                break;
            }
        }
        this.state.runningContainers.remove(Integer.valueOf(i2));
        int exitStatus = containerStatus.getExitStatus();
        switch (exitStatus) {
            case -102:
            case -101:
            case -100:
                log.info("Got an exit code of {}. This means that container {} was killed by YARN, either due to being released by the application master or being 'lost' due to node failures etc. or due to preemption by the RM", Integer.valueOf(exitStatus), converterUtils);
                this.state.releasedContainers.incrementAndGet();
                if (i2 != -1) {
                    log.info("Released container {} was assigned task group ID {}. Requesting a new container for the task group.", converterUtils, Integer.valueOf(i2));
                    this.state.neededContainers.incrementAndGet();
                    this.state.jobHealthy.set(false);
                    this.containerAllocator.requestContainer(i2, "ANY_HOST");
                    return;
                }
                return;
            case AbstractContainerAllocator.DEFAULT_PRIORITY /* 0 */:
                log.info("Container {} completed successfully.", converterUtils);
                this.state.completedContainers.incrementAndGet();
                if (i2 != -1) {
                    this.state.finishedContainers.add(Integer.valueOf(i2));
                    this.containerFailures.remove(Integer.valueOf(i2));
                }
                if (this.state.completedContainers.get() == this.state.containerCount) {
                    log.info("Setting job status to SUCCEEDED, since all containers have been marked as completed.");
                    this.state.status = FinalApplicationStatus.SUCCEEDED;
                    return;
                }
                return;
            default:
                log.info("Container failed for some reason. Let's start it again");
                log.info("Container " + converterUtils + " failed with exit code " + exitStatus + " - " + containerStatus.getDiagnostics());
                this.state.failedContainers.incrementAndGet();
                this.state.failedContainersStatus.put(converterUtils, containerStatus);
                this.state.jobHealthy.set(false);
                if (i2 != -1) {
                    this.state.neededContainers.incrementAndGet();
                    String containerToHostValue = this.state.jobCoordinator.jobModel().getContainerToHostValue(Integer.valueOf(i2), "host");
                    if (!this.hostAffinityEnabled || containerToHostValue == null) {
                        containerToHostValue = "ANY_HOST";
                    }
                    int containerRetryCount = this.yarnConfig.getContainerRetryCount();
                    int containerRetryWindowMs = this.yarnConfig.getContainerRetryWindowMs();
                    if (containerRetryCount == 0) {
                        log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", Integer.valueOf(i2), converterUtils);
                        this.tooManyFailedContainers = true;
                    } else if (containerRetryCount > 0) {
                        if (this.containerFailures.containsKey(Integer.valueOf(i2))) {
                            ContainerFailure containerFailure = this.containerFailures.get(Integer.valueOf(i2));
                            i = containerFailure.getCount() + 1;
                            j = containerFailure.getLastFailure().longValue();
                        } else {
                            i = 1;
                            j = 0;
                        }
                        if (i >= containerRetryCount) {
                            long currentTimeMillis = System.currentTimeMillis() - j;
                            if (currentTimeMillis < containerRetryWindowMs) {
                                log.error("Container ID " + i2 + "(" + converterUtils + ") has failed " + i + " times, with last failure " + currentTimeMillis + "ms ago. This is greater than retry count of " + containerRetryCount + " and window of " + containerRetryWindowMs + "ms , so shutting down the application master, and marking the job as failed.");
                                this.tooManyFailedContainers = true;
                                this.state.status = FinalApplicationStatus.FAILED;
                            } else {
                                log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for this container ID was outside the bounds of the retry window.", Integer.valueOf(i2), converterUtils);
                                this.containerFailures.put(Integer.valueOf(i2), new ContainerFailure(1, Long.valueOf(System.currentTimeMillis())));
                            }
                        } else {
                            log.info("Current fail count for container ID {} is {}.", Integer.valueOf(i2), Integer.valueOf(i));
                            this.containerFailures.put(Integer.valueOf(i2), new ContainerFailure(i, Long.valueOf(System.currentTimeMillis())));
                        }
                    }
                    if (this.tooManyFailedContainers) {
                        return;
                    }
                    this.containerAllocator.requestContainer(i2, containerToHostValue);
                    return;
                }
                return;
        }
    }
}
