package io.ray.streaming.runtime.worker.tasks;

import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.core.processor.SourceProcessor;
import io.ray.streaming.runtime.transfer.exception.ChannelInterruptException;
import io.ray.streaming.runtime.worker.JobWorker;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/worker/tasks/SourceStreamTask.class */
public class SourceStreamTask extends StreamTask {
    private static final Logger LOG = LoggerFactory.getLogger(SourceStreamTask.class);
    private final SourceProcessor sourceProcessor;
    private final AtomicReference<Long> pendingBarrier;
    private long lastCheckpointId;

    public SourceStreamTask(Processor processor, JobWorker jobWorker, long j) {
        super(processor, jobWorker, j);
        this.pendingBarrier = new AtomicReference<>();
        this.lastCheckpointId = 0L;
        this.sourceProcessor = (SourceProcessor) this.processor;
    }

    @Override // io.ray.streaming.runtime.worker.tasks.StreamTask
    protected void init() {
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Source stream task thread start.");
        while (this.running) {
            try {
                this.isInitialState = false;
                Long l = this.pendingBarrier.get();
                if (l != null) {
                    if (this.pendingBarrier.compareAndSet(l, null)) {
                        LOG.info("Start to do checkpoint {}, worker name is {}.", l, this.jobWorker.getWorkerContext().getWorkerName());
                        doCheckpoint(l.longValue(), null);
                        LOG.info("Finish to do checkpoint {}.", l);
                    } else {
                        LOG.warn("Pending checkpointId modify unexpected, expect={}, now={}.", l, this.pendingBarrier.get());
                    }
                }
                this.sourceProcessor.fetch();
            } catch (Throwable th) {
                if ((th instanceof ChannelInterruptException) || (ExceptionUtils.getRootCause(th) instanceof ChannelInterruptException)) {
                    LOG.info("queue has stopped.");
                } else {
                    LOG.error("Last success checkpointId={}, now occur error.", Long.valueOf(this.lastCheckpointId), th);
                    requestRollback(ExceptionUtils.getStackTrace(th));
                }
            }
        }
        LOG.info("Source stream task thread exit.");
    }

    @Override // io.ray.streaming.runtime.worker.tasks.StreamTask
    public boolean triggerCheckpoint(Long l) {
        return this.pendingBarrier.compareAndSet(null, l);
    }
}
