package org.apache.samza.clustermanager;

import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.job.ShellCommandBuilder;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/clustermanager/AbstractContainerAllocator.class */
public abstract class AbstractContainerAllocator implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(AbstractContainerAllocator.class);
    private volatile boolean isRunning = true;
    private final TaskConfig taskConfig;
    private final Config config;
    protected final ClusterResourceManager clusterResourceManager;
    protected final int allocatorSleepIntervalMs;
    protected final int containerMemoryMb;
    protected final int containerNumCpuCores;
    protected final SamzaApplicationState state;
    protected final ResourceRequestState resourceRequestState;

    public AbstractContainerAllocator(ClusterResourceManager clusterResourceManager, ResourceRequestState resourceRequestState, Config config, SamzaApplicationState samzaApplicationState) {
        ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
        this.clusterResourceManager = clusterResourceManager;
        this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime();
        this.resourceRequestState = resourceRequestState;
        this.containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
        this.containerNumCpuCores = clusterManagerConfig.getNumCores();
        this.taskConfig = new TaskConfig(config);
        this.state = samzaApplicationState;
        this.config = config;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.isRunning) {
            try {
                assignResourceRequests();
                this.resourceRequestState.releaseExtraResources();
                Thread.sleep(this.allocatorSleepIntervalMs);
            } catch (InterruptedException e) {
                log.warn("Got InterruptedException in AllocatorThread.", e);
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                log.error("Got unknown Exception in AllocatorThread.", e2);
            }
        }
    }

    protected abstract void assignResourceRequests();

    /* JADX INFO: Access modifiers changed from: protected */
    public void runStreamProcessor(SamzaResourceRequest samzaResourceRequest, String str) {
        CommandBuilder commandBuilder = getCommandBuilder(samzaResourceRequest.getContainerID());
        SamzaResource peekAllocatedResource = peekAllocatedResource(str);
        if (peekAllocatedResource == null) {
            throw new SamzaException("Expected resource was unavailable on host " + str);
        }
        this.resourceRequestState.updateStateAfterAssignment(samzaResourceRequest, str, peekAllocatedResource);
        int containerID = samzaResourceRequest.getContainerID();
        log.info("Found available resources on {}. Assigning request for container_id {} with timestamp {} to resource {}", new Object[]{str, String.valueOf(containerID), Long.valueOf(samzaResourceRequest.getRequestTimestampMs()), peekAllocatedResource.getResourceID()});
        try {
            this.clusterResourceManager.launchStreamProcessor(peekAllocatedResource, commandBuilder);
            if (this.state.neededContainers.decrementAndGet() == 0) {
                this.state.jobHealthy.set(true);
            }
            this.state.runningContainers.put(Integer.valueOf(samzaResourceRequest.getContainerID()), peekAllocatedResource);
        } catch (SamzaContainerLaunchException e) {
            log.warn(String.format("Got exception while starting resource %s. Requesting a new resource on any host", peekAllocatedResource), e);
            this.resourceRequestState.releaseUnstartableContainer(peekAllocatedResource);
            requestResource(containerID, ResourceRequestState.ANY_HOST);
        }
    }

    public void requestResources(Map<Integer, String> map) {
        for (Map.Entry<Integer, String> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            String value = entry.getValue();
            if (value == null) {
                value = ResourceRequestState.ANY_HOST;
            }
            requestResource(intValue, value);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean hasPendingRequest() {
        return peekPendingRequest() != null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final SamzaResourceRequest peekPendingRequest() {
        return this.resourceRequestState.peekPendingRequest();
    }

    public final void requestResource(int i, String str) {
        this.resourceRequestState.addResourceRequest(new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, str, i));
        this.state.containerRequests.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasAllocatedResource(String str) {
        return peekAllocatedResource(str) != null;
    }

    protected SamzaResource peekAllocatedResource(String str) {
        return this.resourceRequestState.peekResource(str);
    }

    private CommandBuilder getCommandBuilder(int i) {
        CommandBuilder commandBuilder = (CommandBuilder) Util.getObj(this.taskConfig.getCommandClass(ShellCommandBuilder.class.getName()));
        commandBuilder.setConfig(this.config).setId(i).setUrl(this.state.jobModelManager.server().getUrl());
        return commandBuilder;
    }

    public final void addResource(SamzaResource samzaResource) {
        this.resourceRequestState.addResource(samzaResource);
    }

    public void stop() {
        this.isRunning = false;
    }
}
