package io.castled.pipelines;

import com.google.inject.Inject;
import io.castled.AppShutdownHandler;
import io.castled.ObjectRegistry;
import io.castled.apps.ExternalApp;
import io.castled.apps.ExternalAppConnector;
import io.castled.apps.ExternalAppService;
import io.castled.apps.ExternalAppType;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.models.PipelineSyncStats;
import io.castled.commons.streams.DefaultDataSinkMessageOutputStream;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.commons.streams.MessageInputStreamImpl;
import io.castled.encryption.EncryptionManager;
import io.castled.errors.MysqlErrorTracker;
import io.castled.errors.SchemaMappedErrorTracker;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.exceptions.pipeline.PipelineExecutionException;
import io.castled.jarvis.taskmanager.TaskExecutor;
import io.castled.jarvis.taskmanager.exceptions.JarvisRetriableException;
import io.castled.jarvis.taskmanager.models.Task;
import io.castled.models.Pipeline;
import io.castled.models.PipelineRun;
import io.castled.models.PipelineRunStage;
import io.castled.models.PipelineRunStatus;
import io.castled.models.Warehouse;
import io.castled.pipelines.exceptions.PipelineInterruptedException;
import io.castled.schema.SchemaUtils;
import io.castled.schema.models.RecordSchema;
import io.castled.services.PipelineService;
import io.castled.services.QueryModelService;
import io.castled.utils.DataMappingUtils;
import io.castled.warehouses.WarehouseConnector;
import io.castled.warehouses.WarehouseService;
import io.castled.warehouses.WarehouseSyncFailureListener;
import io.castled.warehouses.WarehouseType;
import io.castled.warehouses.models.WarehousePollContext;
import io.castled.warehouses.models.WarehousePollResult;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import jodd.util.StringPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/castled/pipelines/PipelineExecutor.class */
public class PipelineExecutor implements TaskExecutor {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PipelineExecutor.class);
    private final PipelineService pipelineService;
    private final Map<WarehouseType, WarehouseConnector> warehouseConnectors;
    private final Map<ExternalAppType, ExternalAppConnector> externalAppConnectors;
    private final WarehouseService warehouseService;
    private final ExternalAppService externalAppService;
    private final EncryptionManager encryptionManager;
    private final MonitoredDataSink monitoredDataSink;
    private final QueryModelService queryModelService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/castled/pipelines/PipelineExecutor$WarehouseExecutionContext.class */
    public static class WarehouseExecutionContext {
        private MessageInputStreamImpl messageInputStreamImpl;
        private RecordSchema warehouseSchema;

        public WarehouseExecutionContext(MessageInputStreamImpl messageInputStreamImpl, RecordSchema recordSchema) {
            this.messageInputStreamImpl = messageInputStreamImpl;
            this.warehouseSchema = recordSchema;
        }

        public WarehouseExecutionContext() {
        }

        public MessageInputStreamImpl getMessageInputStreamImpl() {
            return this.messageInputStreamImpl;
        }

        public RecordSchema getWarehouseSchema() {
            return this.warehouseSchema;
        }

        public void setMessageInputStreamImpl(MessageInputStreamImpl messageInputStreamImpl) {
            this.messageInputStreamImpl = messageInputStreamImpl;
        }

        public void setWarehouseSchema(RecordSchema recordSchema) {
            this.warehouseSchema = recordSchema;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof WarehouseExecutionContext)) {
                return false;
            }
            WarehouseExecutionContext warehouseExecutionContext = (WarehouseExecutionContext) obj;
            if (!warehouseExecutionContext.canEqual(this)) {
                return false;
            }
            MessageInputStreamImpl messageInputStreamImpl = getMessageInputStreamImpl();
            MessageInputStreamImpl messageInputStreamImpl2 = warehouseExecutionContext.getMessageInputStreamImpl();
            if (messageInputStreamImpl == null) {
                if (messageInputStreamImpl2 != null) {
                    return false;
                }
            } else if (!messageInputStreamImpl.equals(messageInputStreamImpl2)) {
                return false;
            }
            RecordSchema warehouseSchema = getWarehouseSchema();
            RecordSchema warehouseSchema2 = warehouseExecutionContext.getWarehouseSchema();
            return warehouseSchema == null ? warehouseSchema2 == null : warehouseSchema.equals(warehouseSchema2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof WarehouseExecutionContext;
        }

        public int hashCode() {
            MessageInputStreamImpl messageInputStreamImpl = getMessageInputStreamImpl();
            int hashCode = (1 * 59) + (messageInputStreamImpl == null ? 43 : messageInputStreamImpl.hashCode());
            RecordSchema warehouseSchema = getWarehouseSchema();
            return (hashCode * 59) + (warehouseSchema == null ? 43 : warehouseSchema.hashCode());
        }

        public String toString() {
            return "PipelineExecutor.WarehouseExecutionContext(messageInputStreamImpl=" + getMessageInputStreamImpl() + ", warehouseSchema=" + getWarehouseSchema() + StringPool.RIGHT_BRACKET;
        }
    }

    @Inject
    public PipelineExecutor(PipelineService pipelineService, Map<WarehouseType, WarehouseConnector> map, WarehouseService warehouseService, Map<ExternalAppType, ExternalAppConnector> map2, ExternalAppService externalAppService, EncryptionManager encryptionManager, MonitoredDataSink monitoredDataSink, QueryModelService queryModelService) {
        this.pipelineService = pipelineService;
        this.warehouseConnectors = map;
        this.warehouseService = warehouseService;
        this.externalAppConnectors = map2;
        this.externalAppService = externalAppService;
        this.encryptionManager = encryptionManager;
        this.monitoredDataSink = monitoredDataSink;
        this.queryModelService = queryModelService;
    }

    @Override // io.castled.jarvis.taskmanager.TaskExecutor
    public String executeTask(Task task) {
        Long valueOf = Long.valueOf(((Number) task.getParams().get("pipeline_id")).longValue());
        Pipeline activePipeline = this.pipelineService.getActivePipeline(valueOf);
        if (activePipeline == null) {
            return null;
        }
        WarehouseSyncFailureListener warehouseSyncFailureListener = null;
        Warehouse warehouse = this.warehouseService.getWarehouse(activePipeline.getWarehouseId());
        PipelineRun orCreatePipelineRun = getOrCreatePipelineRun(valueOf);
        WarehousePollContext build = WarehousePollContext.builder().primaryKeys(getWarehousePrimaryKeys(activePipeline)).pipelineUUID(activePipeline.getUuid()).pipelineRunId(orCreatePipelineRun.getId()).warehouseConfig(warehouse.getConfig()).dataEncryptionKey(this.encryptionManager.getEncryptionKey(warehouse.getTeamId())).queryMode(activePipeline.getQueryMode()).query(getQuery(activePipeline)).pipelineId(activePipeline.getId()).build();
        try {
            WarehouseExecutionContext pollRecords = pollRecords(warehouse, orCreatePipelineRun, build);
            log.info("Poll records completed for pipeline {}", activePipeline.getName());
            this.pipelineService.updatePipelineRunstage(orCreatePipelineRun.getId(), PipelineRunStage.RECORDS_POLLED);
            ExternalApp externalApp = this.externalAppService.getExternalApp(activePipeline.getAppId());
            ExternalAppConnector externalAppConnector = this.externalAppConnectors.get(externalApp.getType());
            RecordSchema appSchema = externalAppConnector.getSchema(externalApp.getConfig(), activePipeline.getAppSyncConfig()).getAppSchema();
            log.info("App schema fetch completed for pipeline {}", activePipeline.getName());
            build.setWarehouseSchema(pollRecords.getWarehouseSchema());
            warehouseSyncFailureListener = this.warehouseConnectors.get(warehouse.getType()).syncFailureListener(build);
            MysqlErrorTracker mysqlErrorTracker = new MysqlErrorTracker(build);
            ErrorOutputStream errorOutputStream = new ErrorOutputStream(new DefaultDataSinkMessageOutputStream(warehouseSyncFailureListener), mysqlErrorTracker);
            SchemaMappedMessageInputStream schemaMappedMessageInputStream = new SchemaMappedMessageInputStream(appSchema, pollRecords.getMessageInputStreamImpl(), DataMappingUtils.appWarehouseMapping(activePipeline.getDataMapping()), DataMappingUtils.warehouseAppMapping(activePipeline.getDataMapping()), errorOutputStream);
            ErrorOutputStream errorOutputStream2 = new ErrorOutputStream(new SchemaMappedMessageOutputStream(SchemaUtils.filterSchema(build.getWarehouseSchema(), getWarehousePrimaryKeys(activePipeline)), warehouseSyncFailureListener, DataMappingUtils.warehouseAppMapping(activePipeline.getDataMapping())), new SchemaMappedErrorTracker(mysqlErrorTracker, pollRecords.getWarehouseSchema(), DataMappingUtils.warehouseAppMapping(activePipeline.getDataMapping())));
            log.info("App Sync started for pipeline {}", activePipeline.getName());
            PipelineSyncStats syncRecords = this.monitoredDataSink.syncRecords(externalAppConnector.getDataSink(), orCreatePipelineRun.getPipelineSyncStats(), orCreatePipelineRun.getId(), DataSinkRequest.builder().externalApp(externalApp).errorOutputStream(errorOutputStream2).appSyncConfig(activePipeline.getAppSyncConfig()).mappedFields(DataMappingUtils.getMappedAppFields(activePipeline.getDataMapping())).objectSchema(appSchema).primaryKeys(DataMappingUtils.getPrimaryKeys(activePipeline.getDataMapping())).mapping(activePipeline.getDataMapping()).queryMode(activePipeline.getQueryMode()).messageInputStream(schemaMappedMessageInputStream).build());
            schemaMappedMessageInputStream.close();
            log.info("App Sync completed for pipeline {}", activePipeline.getName());
            errorOutputStream.flushFailedRecords();
            errorOutputStream2.flushFailedRecords();
            this.warehouseConnectors.get(warehouse.getType()).getDataPoller().cleanupPipelineRunResources(build);
            syncRecords.setRecordsFailed(schemaMappedMessageInputStream.getFailedRecords() + syncRecords.getRecordsFailed());
            this.pipelineService.markPipelineRunProcessed(orCreatePipelineRun.getId(), orCreatePipelineRun.getPipelineId(), syncRecords);
            return null;
        } catch (Exception e) {
            if (((AppShutdownHandler) ObjectRegistry.getInstance(AppShutdownHandler.class)).isShutdownTriggered()) {
                throw new PipelineInterruptedException();
            }
            this.pipelineService.markPipelineRunFailed(orCreatePipelineRun.getId(), orCreatePipelineRun.getPipelineId(), (String) Optional.ofNullable(e.getMessage()).orElse("Unknown Error"));
            log.error("Pipeline run failed for pipeline {} ", activePipeline.getId(), e);
            this.warehouseConnectors.get(warehouse.getType()).getDataPoller().cleanupPipelineRunResources(build);
            Optional.ofNullable(warehouseSyncFailureListener).ifPresent(warehouseSyncFailureListener2 -> {
                warehouseSyncFailureListener2.cleanupResources(activePipeline.getUuid(), orCreatePipelineRun.getId(), warehouse.getConfig());
            });
            if (e instanceof PipelineExecutionException) {
                handlePipelineExecutionException(activePipeline, (PipelineExecutionException) e);
                return null;
            }
            log.error("Pipeline run failed for pipeline {} ", activePipeline.getId(), e);
            return null;
        }
    }

    private List<String> getWarehousePrimaryKeys(Pipeline pipeline) {
        return this.queryModelService.getQueryModelPrimaryKeys(pipeline.getModelId());
    }

    private String getQuery(Pipeline pipeline) {
        return this.queryModelService.getSourceQuery(pipeline.getModelId());
    }

    private PipelineRun getOrCreatePipelineRun(Long l) {
        PipelineRun lastRun = this.pipelineService.getLastRun(l);
        if (lastRun != null && lastRun.getStatus().equals(PipelineRunStatus.PROCESSING)) {
            return lastRun;
        }
        return this.pipelineService.getPipelineRun(Long.valueOf(this.pipelineService.createPipelineRun(l)));
    }

    private WarehouseExecutionContext pollRecords(Warehouse warehouse, PipelineRun pipelineRun, WarehousePollContext warehousePollContext) throws Exception {
        WarehouseConnector warehouseConnector = this.warehouseConnectors.get(warehouse.getType());
        WarehousePollResult resumePoll = pipelineRun.getStage().recordsPolled() ? warehouseConnector.getDataPoller().resumePoll(warehousePollContext) : warehouseConnector.getDataPoller().pollRecords(warehousePollContext);
        return new WarehouseExecutionContext(new MessageInputStreamImpl(resumePoll.getRecordInputStream(), resumePoll.isResumed() ? pipelineRun.getPipelineSyncStats().getOffset() : 0L), resumePoll.getWarehouseSchema());
    }

    private void handlePipelineExecutionException(Pipeline pipeline, PipelineExecutionException pipelineExecutionException) {
        switch (pipelineExecutionException.getPipelineError().getPipelineErrorType()) {
            case INTERMITTENT:
                log.error("Pipeline run failed for pipeline {} ", pipeline.getId(), pipelineExecutionException);
                throw new JarvisRetriableException(pipelineExecutionException.getErrorMessage());
            case INTERNAL:
            case USER_ACTIION_REQUIRED:
                log.error("Pipeline run failed for pipeline {} ", pipeline.getId(), pipelineExecutionException);
                return;
            default:
                throw new CastledRuntimeException(String.format("Invalid error type: %s", pipelineExecutionException.getPipelineError().getPipelineErrorType()));
        }
    }
}
