package org.apache.samza.job.yarn;

import java.util.List;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.samza.config.YarnConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/job/yarn/HostAwareContainerAllocator.class */
public class HostAwareContainerAllocator extends AbstractContainerAllocator {
    private static final Logger log = LoggerFactory.getLogger(HostAwareContainerAllocator.class);
    private final int CONTAINER_REQUEST_TIMEOUT;

    public HostAwareContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> aMRMClientAsync, ContainerUtil containerUtil, YarnConfig yarnConfig) {
        super(aMRMClientAsync, containerUtil, new ContainerRequestState(aMRMClientAsync, true), yarnConfig);
        this.CONTAINER_REQUEST_TIMEOUT = yarnConfig.getContainerRequestTimeout();
    }

    @Override // org.apache.samza.job.yarn.AbstractContainerAllocator, java.lang.Runnable
    public void run() {
        while (this.isRunning.get()) {
            try {
                while (!this.containerRequestState.getRequestsQueue().isEmpty()) {
                    SamzaContainerRequest peek = this.containerRequestState.getRequestsQueue().peek();
                    String preferredHost = peek.getPreferredHost();
                    int expectedContainerId = peek.getExpectedContainerId();
                    log.info("Handling request for container id {} on preferred host {}", Integer.valueOf(expectedContainerId), preferredHost);
                    List<Container> containersOnAHost = this.containerRequestState.getContainersOnAHost(preferredHost);
                    if (containersOnAHost == null || containersOnAHost.size() <= 0) {
                        log.info("Did not find any allocated containers on preferred host {} for running container id {}", preferredHost, Integer.valueOf(expectedContainerId));
                        boolean requestExpired = requestExpired(peek);
                        List<Container> containersOnAHost2 = this.containerRequestState.getContainersOnAHost("ANY_HOST");
                        if (!requestExpired || containersOnAHost2 == null || containersOnAHost2.size() == 0) {
                            log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't find any free allocated containers in the buffer. Breaking out of loop.", peek.getRequestTimestamp(), Integer.valueOf(this.CONTAINER_REQUEST_TIMEOUT));
                            break;
                        } else if (containersOnAHost2.size() > 0) {
                            Container container = containersOnAHost2.get(0);
                            log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with timestamp {} to container {}", new Object[]{String.valueOf(expectedContainerId), peek.getRequestTimestamp(), container.getId()});
                            this.containerRequestState.updateStateAfterAssignment(peek, "ANY_HOST", container);
                            log.info("Running {} on {}", Integer.valueOf(expectedContainerId), container.getId());
                            this.containerUtil.runContainer(expectedContainerId, container);
                        }
                    } else {
                        Container container2 = containersOnAHost.get(0);
                        this.containerRequestState.updateStateAfterAssignment(peek, preferredHost, container2);
                        log.info("Running {} on {}", Integer.valueOf(expectedContainerId), container2.getId());
                        this.containerUtil.runMatchedContainer(expectedContainerId, container2);
                    }
                }
                this.containerRequestState.releaseExtraContainers();
                Thread.sleep(this.ALLOCATOR_SLEEP_TIME);
            } catch (InterruptedException e) {
                log.info("Got an InterruptedException in HostAwareContainerAllocator thread!", e);
                return;
            } catch (Exception e2) {
                log.info("Got an unknown Exception in HostAwareContainerAllocator thread!", e2);
                return;
            }
        }
    }

    private boolean requestExpired(SamzaContainerRequest samzaContainerRequest) {
        return System.currentTimeMillis() - samzaContainerRequest.getRequestTimestamp().longValue() > ((long) this.CONTAINER_REQUEST_TIMEOUT);
    }
}
