package com.alibaba.ageiport.processor.core.task.importer.worker;

import com.alibaba.ageiport.common.feature.FeatureUtils;
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.CollectionUtils;
import com.alibaba.ageiport.common.utils.IOUtils;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.common.utils.TaskIdUtil;
import com.alibaba.ageiport.ext.arch.ExtensionLoader;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.constants.ConstValues;
import com.alibaba.ageiport.processor.core.constants.MainTaskFeatureKeys;
import com.alibaba.ageiport.processor.core.constants.TaskStatus;
import com.alibaba.ageiport.processor.core.model.api.BizColumnHeaders;
import com.alibaba.ageiport.processor.core.model.api.BizDynamicColumnHeaders;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeader;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeaders;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.cache.BigDataCache;
import com.alibaba.ageiport.processor.core.spi.client.CreateSubTasksRequest;
import com.alibaba.ageiport.processor.core.spi.convertor.Model;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.file.FileContext;
import com.alibaba.ageiport.processor.core.spi.file.FileReader;
import com.alibaba.ageiport.processor.core.spi.file.FileReaderFactory;
import com.alibaba.ageiport.processor.core.spi.file.FileWriter;
import com.alibaba.ageiport.processor.core.spi.file.FileWriterFactory;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskContextFactory;
import com.alibaba.ageiport.processor.core.spi.task.monitor.TaskStageEvent;
import com.alibaba.ageiport.processor.core.spi.task.selector.TaskSpiSelector;
import com.alibaba.ageiport.processor.core.spi.task.slice.SliceStrategy;
import com.alibaba.ageiport.processor.core.spi.task.stage.MainTaskStageProvider;
import com.alibaba.ageiport.processor.core.spi.task.stage.Stage;
import com.alibaba.ageiport.processor.core.spi.task.stage.SubTaskStageProvider;
import com.alibaba.ageiport.processor.core.task.AbstractMainTaskWorker;
import com.alibaba.ageiport.processor.core.task.importer.ImportProcessor;
import com.alibaba.ageiport.processor.core.task.importer.adapter.ImportProcessorAdapter;
import com.alibaba.ageiport.processor.core.task.importer.context.ImportMainTaskContext;
import com.alibaba.ageiport.processor.core.task.importer.model.ImportTaskRuntimeConfig;
import com.alibaba.ageiport.processor.core.task.importer.model.ImportTaskRuntimeConfigImpl;
import com.alibaba.ageiport.processor.core.task.importer.model.ImportTaskSpecification;
import com.alibaba.ageiport.processor.core.task.importer.slice.ImportSlice;
import com.alibaba.ageiport.processor.core.task.importer.slice.ImportSliceStrategy;
import com.alibaba.ageiport.processor.core.utils.HeadersUtil;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:BOOT-INF/lib/ageiport-processor-core-0.2.5.jar:com/alibaba/ageiport/processor/core/task/importer/worker/ImportMainTaskWorker.class */
public class ImportMainTaskWorker<QUERY, DATA, VIEW> extends AbstractMainTaskWorker {
    public static Logger log = LoggerFactory.getLogger(ImportMainTaskWorker.class);

    public ImportMainTaskWorker(AgeiPort ageiPort, MainTask mainTask) {
        this.ageiPort = ageiPort;
        this.mainTask = mainTask;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorker
    public void doPrepare() {
        AgeiPort ageiPort = getAgeiPort();
        String mainTaskId = this.mainTask.getMainTaskId();
        String executeType = this.mainTask.getExecuteType();
        String type = this.mainTask.getType();
        String code = this.mainTask.getCode();
        FileReader fileReader = null;
        try {
            try {
                TaskSpiSelector taskSpiSelector = ageiPort.getTaskSpiSelector();
                MainTaskStageProvider mainTaskStageProvider = (MainTaskStageProvider) taskSpiSelector.selectExtension(executeType, type, code, MainTaskStageProvider.class);
                ImportMainTaskContext importMainTaskContext = (ImportMainTaskContext) ((MainTaskContextFactory) taskSpiSelector.selectExtension(executeType, type, code, MainTaskContextFactory.class)).create(ageiPort, this.mainTask);
                importMainTaskContext.getMainTask().setStatus(TaskStatus.EXECUTING);
                importMainTaskContext.getMainTask().setGmtStart(new Date());
                importMainTaskContext.save();
                importMainTaskContext.setStage(mainTaskStageProvider.mainTaskStart());
                ImportTaskSpecification importTaskSpec = importMainTaskContext.getImportTaskSpec();
                ImportProcessor<QUERY, DATA, VIEW> importProcessor = importTaskSpec.getImportProcessor();
                ImportProcessorAdapter importProcessorAdapter = (ImportProcessorAdapter) importProcessor.getConcreteAdapter();
                BizUser bizUser = importMainTaskContext.getBizUser();
                Object query = importMainTaskContext.getQuery();
                importMainTaskContext.goNextStageEventNew();
                importMainTaskContext.load(importProcessorAdapter.taskRuntimeConfig(bizUser, query, importProcessor, importMainTaskContext));
                importMainTaskContext.save();
                importMainTaskContext.goNextStageEventNew();
                ImportTaskRuntimeConfig importTaskRuntimeConfig = importMainTaskContext.getImportTaskRuntimeConfig();
                String fileType = importTaskRuntimeConfig.getFileType();
                importMainTaskContext.goNextStageEventNew();
                importMainTaskContext.load((ImportMainTaskContext) importProcessorAdapter.resetQuery(bizUser, query, importProcessor, importMainTaskContext));
                importMainTaskContext.goNextStageEventNew();
                importMainTaskContext.goNextStageEventNew();
                BizColumnHeaders headers = importProcessorAdapter.getHeaders(bizUser, query, importProcessor, importMainTaskContext);
                importMainTaskContext.goNextStageEventNew();
                importMainTaskContext.goNextStageEventNew();
                BizDynamicColumnHeaders dynamicHeaders = importProcessorAdapter.getDynamicHeaders(bizUser, query, importProcessor, importMainTaskContext);
                importMainTaskContext.goNextStageEventNew();
                ColumnHeaders buildHeaders = HeadersUtil.buildHeaders(headers, importTaskSpec.getViewClass(), dynamicHeaders);
                for (ColumnHeader columnHeader : buildHeaders.getColumnHeaders()) {
                    if (columnHeader.getIgnoreHeader() == null) {
                        columnHeader.setIgnoreHeader(false);
                    }
                }
                importMainTaskContext.load(buildHeaders);
                importMainTaskContext.goNextStageEventNew();
                InputStream inputStream = ageiPort.getFileStore().get((String) FeatureUtils.getFeature(this.mainTask.getFeature(), MainTaskFeatureKeys.INPUT_FILE_KEY), new HashMap());
                FileReaderFactory fileReaderFactory = (FileReaderFactory) ExtensionLoader.getExtensionLoader(FileReaderFactory.class).getExtension(ageiPort.getOptions().getFileTypeReaderSpiMappings().get(fileType));
                FileContext fileContext = new FileContext();
                fileContext.setBizQuery(JsonUtil.toJsonString(this.mainTask.getBizQuery()));
                fileContext.setTaskSpec(importMainTaskContext.getImportTaskSpec());
                fileContext.setMainTask(this.mainTask);
                fileReader = fileReaderFactory.create(ageiPort, buildHeaders, fileContext);
                fileReader.read(inputStream);
                importMainTaskContext.load(fileReader.finish());
                importMainTaskContext.goNextStageEventNew();
                importMainTaskContext.goNextStageEventNew();
                List<ImportSlice> slice = ((ImportSliceStrategy) ExtensionLoader.getExtensionLoader(SliceStrategy.class).getExtension(importTaskRuntimeConfig.getTaskSliceStrategy())).slice(importMainTaskContext);
                importMainTaskContext.load(slice);
                importMainTaskContext.goNextStageEventNew();
                importMainTaskContext.save();
                importMainTaskContext.goNextStageEventNew();
                BigDataCache bigDataCacheCache = ageiPort.getBigDataCacheManager().getBigDataCacheCache(this.mainTask.getExecuteType());
                ArrayList arrayList = new ArrayList();
                for (ImportSlice importSlice : slice) {
                    CreateSubTasksRequest.SubTaskInstance subTaskInstance = new CreateSubTasksRequest.SubTaskInstance();
                    subTaskInstance.setSubTaskNo(importSlice.getNo());
                    subTaskInstance.setBizQuery(importSlice.getQueryJson());
                    Map<String, Object> map = Model.toMap(importTaskRuntimeConfig);
                    ImportTaskRuntimeConfigImpl importTaskRuntimeConfigImpl = (ImportTaskRuntimeConfigImpl) Model.toModel(map, new ImportTaskRuntimeConfigImpl());
                    importTaskRuntimeConfigImpl.setNo(importSlice.getNo());
                    importTaskRuntimeConfigImpl.setPageSize(importSlice.getPageSize());
                    subTaskInstance.setRuntimeParam(FeatureUtils.merge(JsonUtil.toJsonString(map), JsonUtil.toJsonString(importTaskRuntimeConfigImpl)));
                    arrayList.add(subTaskInstance);
                    bigDataCacheCache.put(TaskIdUtil.genSubTaskId(mainTaskId, importSlice.getNo()) + ConstValues.CACHE_SUFFIX_INPUT_DATA_SLICE, importSlice.getDataGroup());
                }
                CreateSubTasksRequest createSubTasksRequest = new CreateSubTasksRequest();
                createSubTasksRequest.setMainTaskId(mainTaskId);
                createSubTasksRequest.setSubTaskInstances(arrayList);
                ageiPort.getTaskServerClient().createSubTask(createSubTasksRequest);
                importMainTaskContext.goNextStageEventNew();
                importMainTaskContext.assertCurrentStage(mainTaskStageProvider.mainTaskSaveSliceEnd());
                Stage subTaskCreated = ((SubTaskStageProvider) taskSpiSelector.selectExtension(executeType, type, code, SubTaskStageProvider.class)).subTaskCreated();
                Iterator<ImportSlice> it = slice.iterator();
                while (it.hasNext()) {
                    ageiPort.getEventBusManager().getEventBus(executeType).post(TaskStageEvent.subTaskEvent(TaskIdUtil.genSubTaskId(mainTaskId, it.next().getNo()), subTaskCreated));
                }
                IOUtils.closeQuietly(fileReader);
            } catch (Throwable th) {
                log.error("doPrepare failed, main:{}", mainTaskId, th);
                ageiPort.onError(this.mainTask, th);
                IOUtils.closeQuietly(fileReader);
            }
        } catch (Throwable th2) {
            IOUtils.closeQuietly(fileReader);
            throw th2;
        }
    }

    @Override // com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorker
    public void doReduce() {
        FileWriter fileWriter = null;
        InputStream inputStream = null;
        MainTask mainTask = getMainTask();
        try {
            try {
                String executeType = mainTask.getExecuteType();
                String type = mainTask.getType();
                String code = mainTask.getCode();
                TaskSpiSelector taskSpiSelector = this.ageiPort.getTaskSpiSelector();
                MainTaskContextFactory mainTaskContextFactory = (MainTaskContextFactory) taskSpiSelector.selectExtension(executeType, type, code, MainTaskContextFactory.class);
                MainTaskStageProvider mainTaskStageProvider = (MainTaskStageProvider) taskSpiSelector.selectExtension(executeType, type, code, MainTaskStageProvider.class);
                ImportMainTaskContext importMainTaskContext = (ImportMainTaskContext) mainTaskContextFactory.create(this.ageiPort, mainTask);
                importMainTaskContext.setStage(mainTaskStageProvider.mainTaskReduceStart());
                importMainTaskContext.eventCurrentStage();
                ImportTaskRuntimeConfig importTaskRuntimeConfig = importMainTaskContext.getImportTaskRuntimeConfig();
                ArrayList arrayList = new ArrayList();
                BigDataCache bigDataCacheCache = this.ageiPort.getBigDataCacheManager().getBigDataCacheCache(mainTask.getExecuteType());
                for (int i = 1; i <= mainTask.getSubTotalCount().intValue(); i++) {
                    String genSubTaskId = TaskIdUtil.genSubTaskId(mainTask.getMainTaskId(), Integer.valueOf(i));
                    if (bigDataCacheCache.exist(genSubTaskId)) {
                        arrayList.add(genSubTaskId);
                    }
                }
                if (CollectionUtils.isNotEmpty(arrayList)) {
                    FileWriterFactory fileWriterFactory = (FileWriterFactory) ExtensionLoader.getExtensionLoader(FileWriterFactory.class).getExtension(this.ageiPort.getOptions().getFileTypeWriterSpiMappings().get(importTaskRuntimeConfig.getFileType()));
                    ColumnHeaders columnHeaders = importMainTaskContext.getColumnHeaders();
                    FileContext fileContext = new FileContext();
                    fileContext.setBizQuery(JsonUtil.toJsonString(mainTask.getBizQuery()));
                    fileContext.setTaskSpec(importMainTaskContext.getImportTaskSpec());
                    fileContext.setMainTask(mainTask);
                    fileWriter = fileWriterFactory.create(this.ageiPort, columnHeaders, fileContext);
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        fileWriter.write((DataGroup) bigDataCacheCache.remove((String) it.next(), DataGroup.class));
                    }
                    inputStream = fileWriter.finish();
                }
                importMainTaskContext.goNextStageEventNew();
                importMainTaskContext.goNextStageEventNew();
                String str = mainTask.getMainTaskId() + "." + importTaskRuntimeConfig.getFileType();
                if (CollectionUtils.isNotEmpty(arrayList)) {
                    this.ageiPort.getFileStore().save(str, inputStream, new HashMap());
                }
                MainTask mainTask2 = importMainTaskContext.getMainTask();
                mainTask2.setFeature(FeatureUtils.putFeature(mainTask2.getFeature(), MainTaskFeatureKeys.OUTPUT_FILE_KEY, str));
                importMainTaskContext.goNextStageEventNew();
                onFinished(importMainTaskContext);
                importMainTaskContext.goNextStageEventNew();
                importMainTaskContext.assertCurrentStage(mainTaskStageProvider.mainTaskFinished());
                IOUtils.closeQuietly(fileWriter);
                IOUtils.closeQuietly(inputStream);
            } catch (Throwable th) {
                log.error("StandaloneExportMainTaskWorker#doReduce failed, main:{}", mainTask.getMainTaskId(), th);
                this.ageiPort.onError(mainTask, th);
                IOUtils.closeQuietly(fileWriter);
                IOUtils.closeQuietly(inputStream);
            }
        } catch (Throwable th2) {
            IOUtils.closeQuietly(fileWriter);
            IOUtils.closeQuietly(inputStream);
            throw th2;
        }
    }
}
