package com.alibaba.ageiport.processor.core.task.exporter.worker;

import com.alibaba.ageiport.common.feature.FeatureUtils;
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.IOUtils;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.common.utils.TaskIdUtil;
import com.alibaba.ageiport.ext.arch.ExtensionLoader;
import com.alibaba.ageiport.ext.file.store.FileStore;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.constants.MainTaskFeatureKeys;
import com.alibaba.ageiport.processor.core.constants.TaskStatus;
import com.alibaba.ageiport.processor.core.model.api.BizColumnHeaders;
import com.alibaba.ageiport.processor.core.model.api.BizDynamicColumnHeaders;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeader;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeaders;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.client.CreateSubTasksRequest;
import com.alibaba.ageiport.processor.core.spi.convertor.Model;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.file.FileContext;
import com.alibaba.ageiport.processor.core.spi.file.FileWriter;
import com.alibaba.ageiport.processor.core.spi.file.FileWriterFactory;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskContextFactory;
import com.alibaba.ageiport.processor.core.spi.task.monitor.TaskStageEvent;
import com.alibaba.ageiport.processor.core.spi.task.selector.TaskSpiSelector;
import com.alibaba.ageiport.processor.core.spi.task.slice.SliceStrategy;
import com.alibaba.ageiport.processor.core.spi.task.stage.MainTaskStageProvider;
import com.alibaba.ageiport.processor.core.spi.task.stage.Stage;
import com.alibaba.ageiport.processor.core.spi.task.stage.SubTaskStageProvider;
import com.alibaba.ageiport.processor.core.task.AbstractMainTaskWorker;
import com.alibaba.ageiport.processor.core.task.exporter.ExportProcessor;
import com.alibaba.ageiport.processor.core.task.exporter.adapter.ExportProcessorAdapter;
import com.alibaba.ageiport.processor.core.task.exporter.context.ExportMainTaskContext;
import com.alibaba.ageiport.processor.core.task.exporter.model.ExportTaskRuntimeConfig;
import com.alibaba.ageiport.processor.core.task.exporter.model.ExportTaskRuntimeConfigImpl;
import com.alibaba.ageiport.processor.core.task.exporter.model.ExportTaskSpecification;
import com.alibaba.ageiport.processor.core.task.exporter.slice.ExportSlice;
import com.alibaba.ageiport.processor.core.task.exporter.slice.ExportSliceStrategy;
import com.alibaba.ageiport.processor.core.utils.HeadersUtil;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:com/alibaba/ageiport/processor/core/task/exporter/worker/ExportMainTaskWorker.class */
public class ExportMainTaskWorker<QUERY, DATA, VIEW> extends AbstractMainTaskWorker {
    public static Logger log = LoggerFactory.getLogger(ExportMainTaskWorker.class);

    public ExportMainTaskWorker(AgeiPort ageiPort, MainTask mainTask) {
        this.ageiPort = ageiPort;
        this.mainTask = mainTask;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorker
    public void doPrepare() {
        AgeiPort ageiPort = getAgeiPort();
        String mainTaskId = this.mainTask.getMainTaskId();
        String executeType = this.mainTask.getExecuteType();
        String type = this.mainTask.getType();
        String code = this.mainTask.getCode();
        try {
            TaskSpiSelector taskSpiSelector = ageiPort.getTaskSpiSelector();
            MainTaskStageProvider mainTaskStageProvider = (MainTaskStageProvider) taskSpiSelector.selectExtension(executeType, type, code, MainTaskStageProvider.class);
            ExportMainTaskContext exportMainTaskContext = (ExportMainTaskContext) ((MainTaskContextFactory) taskSpiSelector.selectExtension(executeType, type, code, MainTaskContextFactory.class)).create(ageiPort, this.mainTask);
            exportMainTaskContext.getMainTask().setStatus(TaskStatus.EXECUTING);
            exportMainTaskContext.getMainTask().setGmtStart(new Date());
            exportMainTaskContext.save();
            exportMainTaskContext.setStage(mainTaskStageProvider.mainTaskStart());
            ExportTaskSpecification exportTaskSpec = exportMainTaskContext.getExportTaskSpec();
            ExportProcessor<QUERY, DATA, VIEW> processor = exportTaskSpec.getProcessor();
            ExportProcessorAdapter exportProcessorAdapter = (ExportProcessorAdapter) processor.getConcreteAdapter();
            BizUser bizUser = exportMainTaskContext.getBizUser();
            Object query = exportMainTaskContext.getQuery();
            exportMainTaskContext.goNextStageEventNew();
            exportMainTaskContext.load(exportProcessorAdapter.taskRuntimeConfig(bizUser, query, processor, exportMainTaskContext));
            exportMainTaskContext.goNextStageEventNew();
            exportMainTaskContext.goNextStageEventNew();
            exportMainTaskContext.load((ExportMainTaskContext) exportProcessorAdapter.resetQuery(bizUser, query, processor, exportMainTaskContext));
            exportMainTaskContext.goNextStageEventNew();
            exportMainTaskContext.goNextStageEventNew();
            exportMainTaskContext.load(exportProcessorAdapter.totalCount(bizUser, query, processor, exportMainTaskContext));
            exportMainTaskContext.goNextStageEventNew();
            exportMainTaskContext.goNextStageEventNew();
            BizColumnHeaders headers = exportProcessorAdapter.getHeaders(bizUser, query, processor, exportMainTaskContext);
            exportMainTaskContext.goNextStageEventNew();
            exportMainTaskContext.goNextStageEventNew();
            BizDynamicColumnHeaders dynamicHeaders = exportProcessorAdapter.getDynamicHeaders(bizUser, query, processor, exportMainTaskContext);
            exportMainTaskContext.goNextStageEventNew();
            ColumnHeaders buildHeaders = HeadersUtil.buildHeaders(headers, exportTaskSpec.getViewClass(), dynamicHeaders);
            for (ColumnHeader columnHeader : buildHeaders.getColumnHeaders()) {
                if (columnHeader.getIgnoreHeader() == null) {
                    columnHeader.setIgnoreHeader(columnHeader.getErrorHeader());
                }
            }
            exportMainTaskContext.load(buildHeaders);
            exportMainTaskContext.goNextStageEventNew();
            List<ExportSlice> slice = ((ExportSliceStrategy) ExtensionLoader.getExtensionLoader(SliceStrategy.class).getExtension(exportMainTaskContext.getExportTaskRuntimeConfig().getTaskSliceStrategy())).slice(exportMainTaskContext);
            exportMainTaskContext.load(slice);
            exportMainTaskContext.goNextStageEventNew();
            exportMainTaskContext.save();
            exportMainTaskContext.goNextStageEventNew();
            ArrayList arrayList = new ArrayList();
            for (ExportSlice exportSlice : slice) {
                CreateSubTasksRequest.SubTaskInstance subTaskInstance = new CreateSubTasksRequest.SubTaskInstance();
                subTaskInstance.setSubTaskNo(exportSlice.getNo());
                subTaskInstance.setBizQuery(exportSlice.getQueryJson());
                Map<String, Object> map = Model.toMap(exportMainTaskContext.getExportTaskRuntimeConfig());
                ExportTaskRuntimeConfigImpl exportTaskRuntimeConfigImpl = (ExportTaskRuntimeConfigImpl) Model.toModel(map, new ExportTaskRuntimeConfigImpl());
                exportTaskRuntimeConfigImpl.setNo(exportSlice.getNo());
                exportTaskRuntimeConfigImpl.setPageOffset(exportSlice.getOffset());
                exportTaskRuntimeConfigImpl.setPageSize(exportSlice.getSize());
                subTaskInstance.setRuntimeParam(FeatureUtils.merge(JsonUtil.toJsonString(map), JsonUtil.toJsonString(exportTaskRuntimeConfigImpl)));
                arrayList.add(subTaskInstance);
            }
            CreateSubTasksRequest createSubTasksRequest = new CreateSubTasksRequest();
            createSubTasksRequest.setMainTaskId(mainTaskId);
            createSubTasksRequest.setSubTaskInstances(arrayList);
            ageiPort.getTaskServerClient().createSubTask(createSubTasksRequest);
            Stage subTaskCreated = ((SubTaskStageProvider) taskSpiSelector.selectExtension(executeType, type, code, SubTaskStageProvider.class)).subTaskCreated();
            Iterator<ExportSlice> it = slice.iterator();
            while (it.hasNext()) {
                ageiPort.getEventBusManager().getEventBus(executeType).post(TaskStageEvent.subTaskEvent(TaskIdUtil.genSubTaskId(mainTaskId, it.next().getNo()), subTaskCreated));
            }
            exportMainTaskContext.goNextStageEventNew();
            exportMainTaskContext.assertCurrentStage(mainTaskStageProvider.mainTaskSaveSliceEnd());
        } catch (Throwable th) {
            log.error("StandaloneExportMainTaskWorker#doPrepare failed, main:{}", new Object[]{mainTaskId, th});
            onError(th);
        }
    }

    @Override // com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorker
    public void doReduce() {
        FileWriter fileWriter = null;
        InputStream inputStream = null;
        MainTask mainTask = getMainTask();
        try {
            try {
                String executeType = mainTask.getExecuteType();
                String type = mainTask.getType();
                String code = mainTask.getCode();
                TaskSpiSelector taskSpiSelector = this.ageiPort.getTaskSpiSelector();
                MainTaskContextFactory mainTaskContextFactory = (MainTaskContextFactory) taskSpiSelector.selectExtension(executeType, type, code, MainTaskContextFactory.class);
                MainTaskStageProvider mainTaskStageProvider = (MainTaskStageProvider) taskSpiSelector.selectExtension(executeType, type, code, MainTaskStageProvider.class);
                ExportMainTaskContext exportMainTaskContext = (ExportMainTaskContext) mainTaskContextFactory.create(this.ageiPort, mainTask);
                exportMainTaskContext.setStage(mainTaskStageProvider.mainTaskReduceStart());
                exportMainTaskContext.eventCurrentStage();
                ExportTaskRuntimeConfig exportTaskRuntimeConfig = exportMainTaskContext.getExportTaskRuntimeConfig();
                FileWriterFactory fileWriterFactory = (FileWriterFactory) ExtensionLoader.getExtensionLoader(FileWriterFactory.class).getExtension(this.ageiPort.getOptions().getFileTypeWriterSpiMappings().get(exportTaskRuntimeConfig.getFileType()));
                ColumnHeaders columnHeaders = exportMainTaskContext.getColumnHeaders();
                FileContext fileContext = new FileContext();
                fileContext.setBizQuery(JsonUtil.toJsonString(mainTask.getBizQuery()));
                fileContext.setTaskSpec(exportMainTaskContext.getExportTaskSpec());
                fileContext.setMainTask(mainTask);
                fileWriter = fileWriterFactory.create(this.ageiPort, columnHeaders, fileContext);
                for (int i = 1; i <= mainTask.getSubTotalCount().intValue(); i++) {
                    fileWriter.write((DataGroup) this.ageiPort.getBigDataCacheManager().getBigDataCacheCache(mainTask.getExecuteType()).remove(TaskIdUtil.genSubTaskId(mainTask.getMainTaskId(), Integer.valueOf(i)), DataGroup.class));
                }
                inputStream = fileWriter.finish();
                exportMainTaskContext.goNextStageEventNew();
                exportMainTaskContext.goNextStageEventNew();
                FileStore fileStore = this.ageiPort.getFileStore();
                String str = mainTask.getMainTaskId() + "." + exportTaskRuntimeConfig.getFileType();
                fileStore.save(str, inputStream, new HashMap());
                MainTask mainTask2 = exportMainTaskContext.getMainTask();
                mainTask2.setFeature(FeatureUtils.putFeature(mainTask2.getFeature(), MainTaskFeatureKeys.OUTPUT_FILE_KEY, str));
                exportMainTaskContext.goNextStageEventNew();
                onFinished(exportMainTaskContext);
                exportMainTaskContext.goNextStageEventNew();
                exportMainTaskContext.assertCurrentStage(mainTaskStageProvider.mainTaskFinished());
                IOUtils.closeQuietly(fileWriter);
                IOUtils.closeQuietly(inputStream);
            } catch (Throwable th) {
                log.error("StandaloneExportMainTaskWorker#doReduce failed, main:{}", new Object[]{mainTask.getMainTaskId(), th});
                onError(th);
                IOUtils.closeQuietly(fileWriter);
                IOUtils.closeQuietly(inputStream);
            }
        } catch (Throwable th2) {
            IOUtils.closeQuietly(fileWriter);
            IOUtils.closeQuietly(inputStream);
            throw th2;
        }
    }
}
