package com.alibaba.ageiport.processor.core.dispatcher.local;

import com.alibaba.ageiport.common.utils.TaskIdUtil;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.TaskSpec;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.dispatcher.Dispatcher;
import com.alibaba.ageiport.processor.core.spi.dispatcher.RootDispatcherContext;
import com.alibaba.ageiport.processor.core.spi.dispatcher.SubDispatcherContext;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorker;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorkerFactory;
import com.alibaba.ageiport.processor.core.spi.task.factory.SubTaskWorkerFactory;
import java.util.Iterator;

/* loaded from: input_file:BOOT-INF/lib/ageiport-processor-core-0.2.4.jar:com/alibaba/ageiport/processor/core/dispatcher/local/LocalDispatcher.class */
public class LocalDispatcher implements Dispatcher {
    private AgeiPort ageiPort;
    private LocalDispatcherOptions options;

    public LocalDispatcher(AgeiPort ageiPort, LocalDispatcherOptions localDispatcherOptions) {
        this.ageiPort = ageiPort;
        this.options = localDispatcherOptions;
    }

    @Override // com.alibaba.ageiport.processor.core.spi.dispatcher.Dispatcher
    public void dispatchMainTaskPrepare(RootDispatcherContext rootDispatcherContext) {
        MainTask mainTask = this.ageiPort.getTaskServerClient().getMainTask(rootDispatcherContext.getMainTaskId());
        TaskSpec taskSpec = this.ageiPort.getSpecificationRegistry().get(mainTask.getCode());
        MainTaskWorker create = ((MainTaskWorkerFactory) this.ageiPort.getTaskSpiSelector().selectExtension(taskSpec.getExecuteType(), taskSpec.getTaskType(), mainTask.getCode(), MainTaskWorkerFactory.class)).create(this.ageiPort, mainTask);
        create.isReduce(false);
        this.ageiPort.getMainWorkerExecutor().submit(create);
    }

    @Override // com.alibaba.ageiport.processor.core.spi.dispatcher.Dispatcher
    public void dispatchSubTasks(SubDispatcherContext subDispatcherContext) {
        String mainTaskId = subDispatcherContext.getMainTaskId();
        MainTask mainTask = this.ageiPort.getTaskServerClient().getMainTask(mainTaskId);
        TaskSpec taskSpec = this.ageiPort.getSpecificationRegistry().get(mainTask.getCode());
        SubTaskWorkerFactory subTaskWorkerFactory = (SubTaskWorkerFactory) this.ageiPort.getTaskSpiSelector().selectExtension(taskSpec.getExecuteType(), taskSpec.getTaskType(), mainTask.getCode(), SubTaskWorkerFactory.class);
        Iterator<Integer> it = subDispatcherContext.getSubTaskNos().iterator();
        while (it.hasNext()) {
            this.ageiPort.getSubWorkerExecutor().submit(subTaskWorkerFactory.create(this.ageiPort, this.ageiPort.getTaskServerClient().getSubTask(TaskIdUtil.genSubTaskId(mainTaskId, it.next()))));
        }
    }

    @Override // com.alibaba.ageiport.processor.core.spi.dispatcher.Dispatcher
    public void dispatchMainTaskReduce(RootDispatcherContext rootDispatcherContext) {
        MainTask mainTask = this.ageiPort.getTaskServerClient().getMainTask(rootDispatcherContext.getMainTaskId());
        TaskSpec taskSpec = this.ageiPort.getSpecificationRegistry().get(mainTask.getCode());
        MainTaskWorker create = ((MainTaskWorkerFactory) this.ageiPort.getTaskSpiSelector().selectExtension(taskSpec.getExecuteType(), taskSpec.getTaskType(), mainTask.getCode(), MainTaskWorkerFactory.class)).create(this.ageiPort, mainTask);
        create.isReduce(true);
        this.ageiPort.getMainWorkerExecutor().submit(create);
    }
}
