package alluxio.master.job.plan;

import alluxio.exception.ExceptionMessage;
import alluxio.exception.JobDoesNotExistException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.job.JobServerContext;
import alluxio.job.plan.PlanConfig;
import alluxio.job.plan.meta.PlanInfo;
import alluxio.job.wire.Status;
import alluxio.master.job.command.CommandManager;
import alluxio.master.job.workflow.WorkflowTracker;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/master/job/plan/PlanTracker.class */
public class PlanTracker {
    private static final Logger LOG = LoggerFactory.getLogger(PlanTracker.class);
    private final long mMaxJobPurgeCount;
    private final long mCapacity;
    private final long mRetentionMs;
    private final ConcurrentHashMap<Long, PlanCoordinator> mCoordinators;
    private final SortedSet<PlanInfo> mFailed;
    private final LinkedBlockingQueue<PlanInfo> mFinished;
    private final WorkflowTracker mWorkflowTracker;

    public PlanTracker(long j, long j2, long j3, WorkflowTracker workflowTracker) {
        Preconditions.checkArgument(j >= 0 && j <= 2147483647L);
        this.mCapacity = j;
        Preconditions.checkArgument(j2 >= 0);
        this.mRetentionMs = j2;
        this.mMaxJobPurgeCount = j3 <= 0 ? Long.MAX_VALUE : j3;
        this.mCoordinators = new ConcurrentHashMap<>(0, 0.95f, Math.max(8, 2 * Runtime.getRuntime().availableProcessors()));
        this.mFailed = Collections.synchronizedSortedSet(new TreeSet((planInfo, planInfo2) -> {
            long lastStatusChangeMs = planInfo2.getLastStatusChangeMs() - planInfo.getLastStatusChangeMs();
            return lastStatusChangeMs != 0 ? Long.signum(lastStatusChangeMs) : Long.signum(planInfo2.getId() - planInfo.getId());
        }));
        this.mFinished = new LinkedBlockingQueue<>();
        this.mWorkflowTracker = workflowTracker;
    }

    private void statusChangeCallback(PlanInfo planInfo) {
        if (planInfo == null) {
            return;
        }
        this.mWorkflowTracker.onPlanStatusChange(planInfo);
        Status status = planInfo.getStatus();
        if (status.isFinished()) {
            if (status.equals(Status.FAILED)) {
                while (this.mFailed.size() >= this.mCapacity) {
                    this.mFailed.remove(this.mFailed.last());
                }
                this.mFailed.add(planInfo);
            }
            for (int i = 0; i < 2; i++) {
                if (this.mFinished.offer(planInfo)) {
                    return;
                }
                if (!removeFinished()) {
                    LOG.warn("Failed to remove any jobs from the finished queue in status change callback");
                }
            }
            if (this.mFinished.offer(planInfo)) {
                return;
            }
            LOG.warn("Failed to offer job id {} to finished queue, removing from tracking preemptively", Long.valueOf(planInfo.getId()));
        }
    }

    @Nullable
    public PlanCoordinator getCoordinator(long j) {
        return this.mCoordinators.get(Long.valueOf(j));
    }

    public synchronized void run(PlanConfig planConfig, CommandManager commandManager, JobServerContext jobServerContext, List<WorkerInfo> list, long j) throws JobDoesNotExistException, ResourceExhaustedException {
        if (!removeFinished()) {
            throw new ResourceExhaustedException(ExceptionMessage.JOB_MASTER_FULL_CAPACITY.getMessage(new Object[]{Long.valueOf(this.mCapacity)}));
        }
        this.mCoordinators.put(Long.valueOf(j), PlanCoordinator.create(commandManager, jobServerContext, list, Long.valueOf(j), planConfig, this::statusChangeCallback));
    }

    private synchronized boolean removeFinished() {
        PlanInfo peek;
        boolean z = false;
        if (!(((long) this.mCoordinators.size()) >= this.mCapacity)) {
            return true;
        }
        if (this.mFinished.isEmpty()) {
            return false;
        }
        ArrayList newArrayList = Lists.newArrayList();
        int i = 0;
        while (true) {
            if (this.mFinished.isEmpty() || i >= this.mMaxJobPurgeCount || (peek = this.mFinished.peek()) == null || CommonUtils.getCurrentMs() - peek.getLastStatusChangeMs() < this.mRetentionMs) {
                break;
            }
            if (this.mFinished.poll() == null) {
                LOG.warn("Polling the queue resulted in a null element");
                break;
            }
            long id = peek.getId();
            newArrayList.add(Long.valueOf(id));
            if (this.mCoordinators.get(Long.valueOf(id)) == null) {
                LOG.warn("Did not find a coordinator with id {}", Long.valueOf(id));
            } else {
                z = true;
                i++;
            }
        }
        this.mWorkflowTracker.cleanup(newArrayList);
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            this.mCoordinators.remove(Long.valueOf(((Long) it.next()).longValue()));
        }
        return z;
    }

    public Collection<Long> list() {
        return Collections.unmodifiableCollection(this.mCoordinators.keySet());
    }

    public Collection<PlanCoordinator> coordinators() {
        return Collections.unmodifiableCollection(this.mCoordinators.values());
    }

    public Stream<PlanInfo> failed() {
        return this.mFailed.stream();
    }

    public Set<Long> findJobs(String str, List<Status> list) {
        return (Set) this.mCoordinators.entrySet().stream().filter(entry -> {
            return list.isEmpty() || (list.contains(((PlanCoordinator) entry.getValue()).getPlanInfoWire(false).getStatus()) && (str == null || str.isEmpty() || ((PlanCoordinator) entry.getValue()).getPlanInfoWire(false).getName().equals(str)));
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
    }
}
