package com.alibaba.ageiport.processor.core.task.exporter.worker;

import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.constants.TaskStatus;
import com.alibaba.ageiport.processor.core.model.api.BizDataGroup;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.model.api.impl.BizExportPageImpl;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.model.core.impl.SubTask;
import com.alibaba.ageiport.processor.core.spi.eventbus.EventBus;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.task.factory.SubTaskContextFactory;
import com.alibaba.ageiport.processor.core.spi.task.monitor.TaskStageEvent;
import com.alibaba.ageiport.processor.core.spi.task.selector.TaskSpiSelector;
import com.alibaba.ageiport.processor.core.spi.task.stage.CommonStage;
import com.alibaba.ageiport.processor.core.spi.task.stage.SubTaskStageProvider;
import com.alibaba.ageiport.processor.core.task.AbstractSubTaskWorker;
import com.alibaba.ageiport.processor.core.task.exporter.adapter.ExportProcessorAdapter;
import com.alibaba.ageiport.processor.core.task.exporter.context.ExportSubTaskContext;
import com.alibaba.ageiport.processor.core.task.exporter.model.ExportTaskRuntimeConfig;
import com.alibaba.ageiport.processor.core.task.exporter.model.ExportTaskSpecification;
import java.util.Date;
import java.util.List;

/* loaded from: input_file:com/alibaba/ageiport/processor/core/task/exporter/worker/ExportSubTaskWorker.class */
public class ExportSubTaskWorker<QUERY, DATA, VIEW> extends AbstractSubTaskWorker {
    public static Logger log = LoggerFactory.getLogger(ExportMainTaskWorker.class);

    @Override // com.alibaba.ageiport.processor.core.spi.task.factory.SubTaskWorker
    public void doMappingProcess() {
        AgeiPort ageiPort = getAgeiPort();
        SubTask subTask = getSubTask();
        try {
            String subTaskId = subTask.getSubTaskId();
            String executeType = subTask.getExecuteType();
            String type = subTask.getType();
            String code = subTask.getCode();
            TaskSpiSelector taskSpiSelector = ageiPort.getTaskSpiSelector();
            SubTaskContextFactory subTaskContextFactory = (SubTaskContextFactory) taskSpiSelector.selectExtension(executeType, type, code, SubTaskContextFactory.class);
            SubTaskStageProvider subTaskStageProvider = (SubTaskStageProvider) taskSpiSelector.selectExtension(executeType, type, code, SubTaskStageProvider.class);
            ageiPort.getEventBusManager().getEventBus(executeType).post(TaskStageEvent.subTaskEvent(subTaskId, subTaskStageProvider.subTaskDispatchedOnNode()));
            ExportSubTaskContext<QUERY, DATA, VIEW> exportSubTaskContext = (ExportSubTaskContext) subTaskContextFactory.create(ageiPort, subTaskId);
            exportSubTaskContext.setStage(subTaskStageProvider.subTaskStart());
            ExportTaskSpecification<QUERY, DATA, VIEW> exportTaskSpec = exportSubTaskContext.getExportTaskSpec();
            ExportProcessorAdapter exportProcessorAdapter = (ExportProcessorAdapter) exportTaskSpec.getProcessor().getConcreteAdapter();
            BizUser bizUser = exportSubTaskContext.getBizUser();
            QUERY query = exportSubTaskContext.getQuery();
            exportSubTaskContext.goNextStageEventNew();
            ExportTaskRuntimeConfig exportTaskRuntimeConfig = exportSubTaskContext.getExportTaskRuntimeConfig();
            BizExportPageImpl bizExportPageImpl = new BizExportPageImpl();
            bizExportPageImpl.setNo(exportTaskRuntimeConfig.getNo());
            bizExportPageImpl.setOffset(exportTaskRuntimeConfig.getPageOffset());
            bizExportPageImpl.setSize(exportTaskRuntimeConfig.getPageSize());
            bizExportPageImpl.setAttributes(exportTaskRuntimeConfig.getAttributes());
            List<DATA> queryData = exportProcessorAdapter.queryData(bizUser, query, bizExportPageImpl, exportTaskSpec.getProcessor(), exportSubTaskContext);
            exportSubTaskContext.goNextStageEventNew();
            exportSubTaskContext.goNextStageEventNew();
            List<VIEW> convert = exportProcessorAdapter.convert(bizUser, query, queryData, exportTaskSpec.getProcessor(), exportSubTaskContext);
            exportSubTaskContext.goNextStageEventNew();
            exportSubTaskContext.goNextStageEventNew();
            BizDataGroup<VIEW> group = exportProcessorAdapter.group(bizUser, query, convert, exportTaskSpec.getProcessor(), exportSubTaskContext);
            exportSubTaskContext.goNextStageEventNew();
            exportSubTaskContext.goNextStageEventNew();
            DataGroup dataGroup = exportProcessorAdapter.getDataGroup(bizUser, query, group, exportTaskSpec.getProcessor(), exportSubTaskContext);
            exportSubTaskContext.goNextStageEventNew();
            exportSubTaskContext.goNextStageEventNew();
            ageiPort.getBigDataCacheManager().getBigDataCacheCache(subTask.getExecuteType()).put(subTaskId, dataGroup);
            exportSubTaskContext.goNextStageEventNew();
            exportSubTaskContext.goNextStageEventNew();
            exportSubTaskContext.assertCurrentStage(subTaskStageProvider.subTaskFinished());
        } catch (Throwable th) {
            log.info("doMappingProcess failed, main:{}, sub:{}", new Object[]{subTask.getMainTaskId(), subTask.getSubTaskId(), th});
            subTask.setStatus(TaskStatus.ERROR).setResultMessage(th.getMessage()).setGmtFinished(new Date());
            ageiPort.getTaskServerClient().updateSubTask(subTask);
            MainTask resultMessage = new MainTask().setMainTaskId(subTask.getMainTaskId()).setStatus(TaskStatus.ERROR).setGmtFinished(new Date()).setResultMessage(th.getMessage());
            ageiPort.getTaskServerClient().updateSubTask(subTask);
            EventBus eventBus = ageiPort.getEventBusManager().getEventBus(subTask.getExecuteType());
            eventBus.post(TaskStageEvent.subTaskEvent(subTask.getSubTaskId(), CommonStage.ERROR));
            eventBus.post(TaskStageEvent.mainTaskEvent(resultMessage.getMainTaskId(), CommonStage.ERROR));
        }
    }
}
