package org.apache.samza.clustermanager;

import org.apache.samza.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/HostAwareContainerAllocator.class */
public class HostAwareContainerAllocator extends AbstractContainerAllocator {
    private static final Logger log = LoggerFactory.getLogger(HostAwareContainerAllocator.class);
    private final int requestTimeout;

    public HostAwareContainerAllocator(ClusterResourceManager clusterResourceManager, int i, Config config, SamzaApplicationState samzaApplicationState) {
        super(clusterResourceManager, new ResourceRequestState(true, clusterResourceManager), config, samzaApplicationState);
        this.requestTimeout = i;
    }

    @Override // org.apache.samza.clustermanager.AbstractContainerAllocator
    public void assignResourceRequests() {
        while (hasPendingRequest()) {
            SamzaResourceRequest peekPendingRequest = peekPendingRequest();
            log.info("Handling request: " + peekPendingRequest.getContainerID() + " " + peekPendingRequest.getRequestTimestampMs() + " " + peekPendingRequest.getPreferredHost());
            String preferredHost = peekPendingRequest.getPreferredHost();
            int containerID = peekPendingRequest.getContainerID();
            if (hasAllocatedResource(preferredHost)) {
                log.info("Found a matched-container {} on the preferred host. Running on {}", Integer.valueOf(containerID), preferredHost);
                runStreamProcessor(peekPendingRequest, preferredHost);
                this.state.matchedResourceRequests.incrementAndGet();
            } else {
                log.info("Did not find any allocated resources on preferred host {} for running container id {}", preferredHost, Integer.valueOf(containerID));
                boolean requestExpired = requestExpired(peekPendingRequest);
                boolean hasAllocatedResource = hasAllocatedResource(ResourceRequestState.ANY_HOST);
                if (!requestExpired || !hasAllocatedResource) {
                    log.info("Either the request timestamp {} is greater than resource request timeout {}ms or we couldn't find any free allocated resources in the buffer. Breaking out of loop.", Long.valueOf(peekPendingRequest.getRequestTimestampMs()), Integer.valueOf(this.requestTimeout));
                    return;
                } else {
                    log.info("Request expired. running on ANY_HOST");
                    runStreamProcessor(peekPendingRequest, ResourceRequestState.ANY_HOST);
                }
            }
        }
    }

    private boolean requestExpired(SamzaResourceRequest samzaResourceRequest) {
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = currentTimeMillis - samzaResourceRequest.getRequestTimestampMs() > ((long) this.requestTimeout);
        if (z) {
            log.info("Request {} with currTime {} has expired", samzaResourceRequest, Long.valueOf(currentTimeMillis));
        }
        return z;
    }
}
