package com.alibaba.ageiport.processor.core.task.importer.worker;

import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.CollectionUtils;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.constants.ConstValues;
import com.alibaba.ageiport.processor.core.model.api.BizDataGroup;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.model.core.impl.SubTask;
import com.alibaba.ageiport.processor.core.spi.cache.BigDataCache;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.task.factory.SubTaskContextFactory;
import com.alibaba.ageiport.processor.core.spi.task.monitor.TaskStageEvent;
import com.alibaba.ageiport.processor.core.spi.task.selector.TaskSpiSelector;
import com.alibaba.ageiport.processor.core.spi.task.stage.SubTaskStageProvider;
import com.alibaba.ageiport.processor.core.task.AbstractSubTaskWorker;
import com.alibaba.ageiport.processor.core.task.importer.ImportProcessor;
import com.alibaba.ageiport.processor.core.task.importer.adapter.ImportProcessorAdapter;
import com.alibaba.ageiport.processor.core.task.importer.context.ImportSubTaskContext;
import com.alibaba.ageiport.processor.core.task.importer.model.BizImportResult;
import com.alibaba.ageiport.processor.core.task.importer.model.BizImportResultImpl;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:BOOT-INF/lib/ageiport-processor-core-0.3.0.jar:com/alibaba/ageiport/processor/core/task/importer/worker/ImportSubTaskWorker.class */
public class ImportSubTaskWorker<QUERY, DATA, VIEW> extends AbstractSubTaskWorker {
    public static Logger log = LoggerFactory.getLogger(ImportMainTaskWorker.class);

    @Override // com.alibaba.ageiport.processor.core.spi.task.factory.SubTaskWorker
    public void doMappingProcess() {
        AgeiPort ageiPort = getAgeiPort();
        SubTask subTask = getSubTask();
        try {
            String subTaskId = subTask.getSubTaskId();
            String executeType = subTask.getExecuteType();
            String type = subTask.getType();
            String code = subTask.getCode();
            TaskSpiSelector taskSpiSelector = ageiPort.getTaskSpiSelector();
            SubTaskContextFactory subTaskContextFactory = (SubTaskContextFactory) taskSpiSelector.selectExtension(executeType, type, code, SubTaskContextFactory.class);
            SubTaskStageProvider subTaskStageProvider = (SubTaskStageProvider) taskSpiSelector.selectExtension(executeType, type, code, SubTaskStageProvider.class);
            ageiPort.getEventBusManager().getEventBus(executeType).post(TaskStageEvent.subTaskEvent(subTaskId, subTaskStageProvider.subTaskDispatchedOnNode()));
            ImportSubTaskContext<QUERY, DATA, VIEW> importSubTaskContext = (ImportSubTaskContext) subTaskContextFactory.create(ageiPort, subTaskId);
            importSubTaskContext.setStage(subTaskStageProvider.subTaskStart());
            ImportProcessor<QUERY, DATA, VIEW> importProcessor = importSubTaskContext.getImportTaskSpec().getImportProcessor();
            ImportProcessorAdapter importProcessorAdapter = (ImportProcessorAdapter) importProcessor.getConcreteAdapter();
            BizUser bizUser = importSubTaskContext.getBizUser();
            QUERY query = importSubTaskContext.getQuery();
            importSubTaskContext.goNextStageEventNew();
            BigDataCache bigDataCacheCache = ageiPort.getBigDataCacheManager().getBigDataCacheCache(executeType);
            DataGroup dataGroup = (DataGroup) bigDataCacheCache.get(subTaskId + ConstValues.CACHE_SUFFIX_INPUT_DATA_SLICE, DataGroup.class);
            importSubTaskContext.goNextStageEventNew();
            importSubTaskContext.goNextStageEventNew();
            DataGroup checkHeaders = importProcessorAdapter.checkHeaders(bizUser, query, dataGroup, importProcessor, importSubTaskContext);
            importSubTaskContext.goNextStageEventNew();
            importSubTaskContext.goNextStageEventNew();
            BizDataGroup<VIEW> bizDataGroup = importProcessorAdapter.getBizDataGroup(bizUser, query, checkHeaders, importProcessor, importSubTaskContext);
            importSubTaskContext.goNextStageEventNew();
            importSubTaskContext.goNextStageEventNew();
            List<VIEW> flat = importProcessorAdapter.flat(bizUser, query, bizDataGroup, importProcessor, importSubTaskContext);
            importSubTaskContext.goNextStageEventNew();
            importSubTaskContext.goNextStageEventNew();
            BizImportResult<VIEW, DATA> convertAndCheck = importProcessorAdapter.convertAndCheck(bizUser, query, flat, importProcessor, importSubTaskContext);
            importSubTaskContext.goNextStageEventNew();
            importSubTaskContext.goNextStageEventNew();
            BizImportResult<VIEW, DATA> write = importProcessorAdapter.write(bizUser, query, convertAndCheck.getData(), importProcessor, importSubTaskContext);
            importSubTaskContext.goNextStageEventNew();
            BizImportResultImpl bizImportResultImpl = new BizImportResultImpl();
            ArrayList arrayList = new ArrayList();
            if (CollectionUtils.isNotEmpty(convertAndCheck.getView())) {
                arrayList.addAll(convertAndCheck.getView());
            }
            if (CollectionUtils.isNotEmpty(write.getView())) {
                arrayList.addAll(write.getView());
            }
            bizImportResultImpl.setView(arrayList);
            importSubTaskContext.goNextStageEventNew();
            BizDataGroup<VIEW> group = importProcessorAdapter.group(bizUser, query, arrayList, importProcessor, importSubTaskContext);
            importSubTaskContext.goNextStageEventNew();
            importSubTaskContext.goNextStageEventNew();
            DataGroup dataGroup2 = importProcessorAdapter.getDataGroup(bizUser, query, group, importProcessor, importSubTaskContext);
            importSubTaskContext.goNextStageEventNew();
            importSubTaskContext.goNextStageEventNew();
            List<DataGroup.Data> data = dataGroup2.getData();
            if (CollectionUtils.isNotEmpty(data)) {
                boolean z = false;
                Iterator<DataGroup.Data> it = data.iterator();
                while (it.hasNext()) {
                    if (CollectionUtils.isNotEmpty(it.next().getItems())) {
                        z = true;
                    }
                }
                if (z) {
                    bigDataCacheCache.put(subTaskId, dataGroup2);
                }
            }
            importSubTaskContext.goNextStageEventNew();
            importSubTaskContext.goNextStageEventNew();
            importSubTaskContext.assertCurrentStage(subTaskStageProvider.subTaskFinished());
        } catch (Throwable th) {
            log.info("doMappingProcess failed, main:{}, sub:{}", subTask.getMainTaskId(), subTask.getSubTaskId(), th);
            ageiPort.onMainError(subTask.getMainTaskId(), th);
            ageiPort.onSubError(subTask, th);
        }
    }
}
