package io.castled.services;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.castled.ObjectRegistry;
import io.castled.apps.ExternalAppConnector;
import io.castled.apps.ExternalAppService;
import io.castled.apps.ExternalAppType;
import io.castled.apps.dtos.AppSyncConfigDTO;
import io.castled.apps.models.ExternalAppSchema;
import io.castled.caches.PipelineCache;
import io.castled.commons.models.PipelineSyncStats;
import io.castled.daos.ErrorReportsDAO;
import io.castled.daos.PipelineDAO;
import io.castled.daos.PipelineRunDAO;
import io.castled.dtos.PipelineConfigDTO;
import io.castled.dtos.PipelineSchema;
import io.castled.dtos.PipelineUpdateRequest;
import io.castled.errors.PipelineErrorAndSample;
import io.castled.errors.PipelineRunErrors;
import io.castled.events.CastledEventsClient;
import io.castled.events.pipelineevents.PipelineEvent;
import io.castled.events.pipelineevents.PipelineEventType;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.jarvis.JarvisTaskGroup;
import io.castled.jarvis.JarvisTaskType;
import io.castled.jarvis.taskmanager.JarvisTasksClient;
import io.castled.jarvis.taskmanager.models.RetryCriteria;
import io.castled.jarvis.taskmanager.models.requests.TaskCreateRequest;
import io.castled.misc.PipelineScheduleManager;
import io.castled.models.AppAggregate;
import io.castled.models.ErrorReport;
import io.castled.models.Pipeline;
import io.castled.models.PipelineRun;
import io.castled.models.PipelineRunStage;
import io.castled.models.PipelineSyncStatus;
import io.castled.models.Warehouse;
import io.castled.models.WarehouseAggregate;
import io.castled.models.users.User;
import io.castled.pubsub.MessagePublisher;
import io.castled.pubsub.registry.PipelineUpdatedMessage;
import io.castled.resources.validators.ResourceAccessController;
import io.castled.schema.SchemaUtils;
import io.castled.schema.models.RecordSchema;
import io.castled.utils.JsonUtils;
import io.castled.utils.TimeUtils;
import io.castled.warehouses.WarehouseConnector;
import io.castled.warehouses.WarehouseService;
import io.castled.warehouses.WarehouseType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.support.LocalizedResourceHelper;

@Singleton
/* loaded from: input_file:io/castled/services/PipelineService.class */
public class PipelineService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PipelineService.class);
    private final PipelineDAO pipelineDAO;
    private final PipelineCache pipelineCache;
    private final PipelineRunDAO pipelineRunDAO;
    private final CastledEventsClient castledEventsClient;
    private final Map<WarehouseType, WarehouseConnector> warehouseConnectors;
    private final Map<ExternalAppType, ExternalAppConnector> appConnectors;
    private final WarehouseService warehouseService;
    private final ExternalAppService externalAppService;
    private final ErrorReportsDAO errorReportsDAO;
    private final MessagePublisher messagePublisher;
    private static final String UUID_PREFIX = "c";
    private static final String ERROR_CODE = "__castled__error_code";
    private static final String ERROR_MESSAGE = "__castled__error_message";
    private static final String ERROR_RECORD_COUNT = "__castled__record_count";
    private final ResourceAccessController resourceAccessController;

    @Inject
    public PipelineService(Jdbi jdbi, CastledEventsClient castledEventsClient, Map<WarehouseType, WarehouseConnector> map, WarehouseService warehouseService, PipelineCache pipelineCache, ExternalAppService externalAppService, ResourceAccessController resourceAccessController, MessagePublisher messagePublisher, Map<ExternalAppType, ExternalAppConnector> map2) {
        this.pipelineDAO = (PipelineDAO) jdbi.onDemand(PipelineDAO.class);
        this.pipelineRunDAO = (PipelineRunDAO) jdbi.onDemand(PipelineRunDAO.class);
        this.errorReportsDAO = (ErrorReportsDAO) jdbi.onDemand(ErrorReportsDAO.class);
        this.castledEventsClient = castledEventsClient;
        this.warehouseConnectors = map;
        this.warehouseService = warehouseService;
        this.pipelineCache = pipelineCache;
        this.externalAppService = externalAppService;
        this.resourceAccessController = resourceAccessController;
        this.messagePublisher = messagePublisher;
        this.appConnectors = map2;
    }

    public Long createPipeline(PipelineConfigDTO pipelineConfigDTO, User user) {
        try {
            PipelineConfigDTO validateAndEnrichPipelineConfig = this.appConnectors.get(this.externalAppService.getExternalApp(pipelineConfigDTO.getAppId(), true).getType()).validateAndEnrichPipelineConfig(pipelineConfigDTO);
            validPipelineConfig(validateAndEnrichPipelineConfig);
            Long valueOf = Long.valueOf(this.pipelineDAO.createPipeline(validateAndEnrichPipelineConfig, user, UUID_PREFIX + UUID.randomUUID().toString().replaceAll("-", LocalizedResourceHelper.DEFAULT_SEPARATOR)));
            this.castledEventsClient.publishPipelineEvent(new PipelineEvent(valueOf, PipelineEventType.PIPELINE_CREATED));
            return valueOf;
        } catch (ClientErrorException e) {
            log.warn("Create pipeline failed for app {} and warehouse {}", pipelineConfigDTO.getAppId(), pipelineConfigDTO.getWarehouseId(), e);
            throw e;
        } catch (Exception e2) {
            log.warn("Create pipeline failed for app {} and warehouse {}", pipelineConfigDTO.getAppId(), pipelineConfigDTO.getWarehouseId(), e2);
            throw new CastledRuntimeException(e2);
        }
    }

    private void validPipelineConfig(PipelineConfigDTO pipelineConfigDTO) throws BadRequestException {
        if (CollectionUtils.isEmpty(pipelineConfigDTO.getMapping().getPrimaryKeys())) {
            throw new BadRequestException("Atleast one primary key needs to be selected for creating a pipeline");
        }
    }

    public void updatePipeline(Long l, PipelineUpdateRequest pipelineUpdateRequest) {
        this.pipelineDAO.updatePipeline(l, pipelineUpdateRequest.getName(), pipelineUpdateRequest.getSchedule());
    }

    public void triggerPipeline(long j, long j2) {
        Pipeline activePipeline = getActivePipeline(Long.valueOf(j));
        this.resourceAccessController.validatePipelineAccess(activePipeline, j2);
        doTriggerPipeline(activePipeline);
    }

    private void doTriggerPipeline(Pipeline pipeline) {
        try {
            ((JarvisTasksClient) ObjectRegistry.getInstance(JarvisTasksClient.class)).createTask(TaskCreateRequest.builder().group(JarvisTaskGroup.PIPELINE_RUN.name()).type(JarvisTaskType.PIPELINE_RUN.name()).expiry(Long.valueOf(Math.max(TimeUtils.secondsToMillis(pipeline.getJobSchedule().getExecutionTime()), TimeUtils.minutesToMillis(120L)))).params(ImmutableMap.of("pipeline_id", pipeline.getId())).uniqueId(String.valueOf(pipeline.getId())).retryCriteria(new RetryCriteria(3, true)).build());
        } catch (Exception e) {
            log.error("Trigger pipeline {} failed", pipeline.getId());
            throw new CastledRuntimeException(e);
        }
    }

    public void restartPipeline(Long l, Long l2) throws Exception {
        Pipeline activePipeline = getActivePipeline(l, true);
        this.resourceAccessController.validatePipelineAccess(activePipeline, l2.longValue());
        Warehouse warehouse = this.warehouseService.getWarehouse(activePipeline.getWarehouseId(), true);
        this.warehouseConnectors.get(warehouse.getType()).restartPoll(activePipeline.getUuid(), warehouse.getConfig());
        doTriggerPipeline(activePipeline);
    }

    public Pipeline getActivePipeline(Long l, boolean z) {
        return z ? this.pipelineCache.getValue(l) : this.pipelineDAO.getActivePipeline(l);
    }

    public void updatePipelineRunstage(Long l, PipelineRunStage pipelineRunStage) {
        this.pipelineRunDAO.updatePipelineRunStage(l, pipelineRunStage);
    }

    public void deletePipeline(Long l, Long l2) {
        this.resourceAccessController.validatePipelineAccess(getActivePipeline(l, true), l2.longValue());
        this.pipelineDAO.markPipelineDeleted(l);
        this.castledEventsClient.publishPipelineEvent(new PipelineEvent(l, PipelineEventType.PIPELINE_DELETED));
    }

    public void pausePipeline(long j, Long l) {
        this.resourceAccessController.validatePipelineAccess(getActivePipeline(Long.valueOf(j), true), l.longValue());
        ((PipelineScheduleManager) ObjectRegistry.getInstance(PipelineScheduleManager.class)).unschedulePipeline(Long.valueOf(j));
        this.pipelineDAO.updateSyncStatus(Long.valueOf(j), PipelineSyncStatus.PAUSED);
        this.messagePublisher.publishMessage(new PipelineUpdatedMessage(Long.valueOf(j)));
    }

    public void resumePipeline(long j, Long l) {
        this.resourceAccessController.validatePipelineAccess(getActivePipeline(Long.valueOf(j), true), l.longValue());
        ((PipelineScheduleManager) ObjectRegistry.getInstance(PipelineScheduleManager.class)).reschedulePipeline(Long.valueOf(j));
        this.pipelineDAO.updateSyncStatus(Long.valueOf(j), PipelineSyncStatus.ACTIVE);
        this.messagePublisher.publishMessage(new PipelineUpdatedMessage(Long.valueOf(j)));
    }

    public Pipeline getActivePipeline(Long l) {
        return getActivePipeline(l, false);
    }

    public long createPipelineRun(Long l) {
        return this.pipelineRunDAO.createPipelineRun(l, new PipelineSyncStats(0L, 0L, 0L, 0L));
    }

    public void markPipelineRunProcessed(Long l, PipelineSyncStats pipelineSyncStats) {
        this.pipelineRunDAO.markProcessed(l, pipelineSyncStats);
    }

    public void markPipelineRunFailed(Long l, String str) {
        this.pipelineRunDAO.markFailed(l, str);
    }

    public void updateSyncStats(Long l, PipelineSyncStats pipelineSyncStats) {
        this.pipelineRunDAO.updateSyncStatus(l, pipelineSyncStats);
    }

    public PipelineRun getPipelineRun(Long l) {
        return this.pipelineRunDAO.getPipelineRun(l);
    }

    public List<PipelineRun> getPipelineRuns(Long l, int i) {
        return i == 0 ? this.pipelineRunDAO.getLastPipelineRuns(l, 100) : this.pipelineRunDAO.getLastPipelineRuns(l, i);
    }

    public PipelineRun getLastRun(Long l) {
        List<PipelineRun> pipelineRuns = getPipelineRuns(l, 1);
        if (CollectionUtils.isEmpty(pipelineRuns)) {
            return null;
        }
        return pipelineRuns.get(0);
    }

    public PipelineRunErrors getPipelineRunErrors(Long l) {
        ErrorReport errorReport = this.errorReportsDAO.getErrorReport(l);
        if (errorReport == null) {
            return new PipelineRunErrors(Lists.newArrayList(), Lists.newArrayList());
        }
        ArrayList newArrayList = Lists.newArrayList();
        String[] split = errorReport.getReport().split(System.lineSeparator());
        List list = (List) JsonUtils.jsonStringToTypeReference(split[0], new TypeReference<List<String>>() { // from class: io.castled.services.PipelineService.1
        });
        Iterator it = Arrays.asList(split).subList(1, split.length).iterator();
        while (it.hasNext()) {
            newArrayList.add((PipelineErrorAndSample) JsonUtils.jsonStringToObject((String) it.next(), PipelineErrorAndSample.class));
        }
        return new PipelineRunErrors(list, newArrayList);
    }

    public StreamingOutput downloadErrorReport(Long l) throws IOException {
        PipelineRunErrors pipelineRunErrors = getPipelineRunErrors(l);
        List<String> errorReportFields = getErrorReportFields(pipelineRunErrors.getSampleFields());
        StringBuffer stringBuffer = new StringBuffer();
        CSVPrinter cSVPrinter = new CSVPrinter(stringBuffer, CSVFormat.DEFAULT.withHeader((String[]) errorReportFields.toArray(new String[0])).withQuoteMode(QuoteMode.ALL));
        for (PipelineErrorAndSample pipelineErrorAndSample : pipelineRunErrors.getErrorAndSamples()) {
            ArrayList newArrayList = Lists.newArrayList();
            Iterator<String> it = pipelineRunErrors.getSampleFields().iterator();
            while (it.hasNext()) {
                newArrayList.add((String) Optional.ofNullable(pipelineErrorAndSample.getRecord().get(it.next())).map((v0) -> {
                    return v0.toString();
                }).orElse("N/A"));
            }
            newArrayList.add(pipelineErrorAndSample.getErrorCode().name());
            newArrayList.add(pipelineErrorAndSample.getDescription());
            newArrayList.add(String.valueOf(pipelineErrorAndSample.getRecordCount()));
            cSVPrinter.printRecord(newArrayList);
        }
        cSVPrinter.flush();
        cSVPrinter.close();
        return outputStream -> {
            outputStream.write(stringBuffer.toString().getBytes());
            outputStream.flush();
        };
    }

    public Pipeline getPipeline(Long l) {
        return this.pipelineDAO.getPipeline(l);
    }

    private List<String> getErrorReportFields(List<String> list) {
        ArrayList newArrayList = Lists.newArrayList(list);
        newArrayList.add(ERROR_CODE);
        newArrayList.add(ERROR_MESSAGE);
        newArrayList.add(ERROR_RECORD_COUNT);
        return newArrayList;
    }

    public void triggerDummyRun() {
        try {
            ((JarvisTasksClient) ObjectRegistry.getInstance(JarvisTasksClient.class)).createTask(TaskCreateRequest.builder().type(JarvisTaskType.DUMMY.name()).group(JarvisTaskGroup.OTHERS.name()).expiry(Long.valueOf(TimeUtils.minutesToMillis(1345L))).retryCriteria(new RetryCriteria(3, true)).build());
        } catch (Exception e) {
            log.error("Trigger dummy run failed", (Throwable) e);
            throw new CastledRuntimeException(e);
        }
    }

    public PipelineSchema getPipelineSchema(AppSyncConfigDTO appSyncConfigDTO) throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2, new ThreadFactoryBuilder().setNameFormat("pipeline-schema-fetch-%d").build());
        try {
            Future<RecordSchema> submit = newFixedThreadPool.submit(() -> {
                return this.warehouseService.fetchSchema(appSyncConfigDTO.getWarehouseId(), appSyncConfigDTO.getSourceQuery());
            });
            Future submit2 = newFixedThreadPool.submit(() -> {
                return this.externalAppService.getObjectSchema(appSyncConfigDTO.getAppId(), appSyncConfigDTO.getAppSyncConfig());
            });
            PipelineSchema pipelineSchema = new PipelineSchema(SchemaUtils.transformToSimpleSchema(enrichWarehouseSchema(appSyncConfigDTO, submit)), SchemaUtils.transformToSimpleSchema(((ExternalAppSchema) submit2.get()).getAppSchema()), ((ExternalAppSchema) submit2.get()).getPkEligibles());
            newFixedThreadPool.shutdownNow();
            return pipelineSchema;
        } catch (Throwable th) {
            newFixedThreadPool.shutdownNow();
            throw th;
        }
    }

    private RecordSchema enrichWarehouseSchema(AppSyncConfigDTO appSyncConfigDTO, Future<RecordSchema> future) throws InterruptedException, ExecutionException {
        return this.appConnectors.get(this.externalAppService.getExternalApp(appSyncConfigDTO.getAppId(), true).getType()).enrichWarehouseASchema(appSyncConfigDTO, future.get());
    }

    public List<Pipeline> listPipelines(Long l, Long l2) {
        return l2 == null ? this.pipelineDAO.listPipelines(l) : this.pipelineDAO.listPipelines(l, l2);
    }

    public List<WarehouseAggregate> getWarehouseAggregates(Long l) {
        return this.pipelineDAO.aggregateByWarehouse(l);
    }

    public int getWarehousePipelines(Long l) {
        return this.pipelineDAO.getWarehousePipelines(l);
    }

    public int getAppPipelines(Long l) {
        return this.pipelineDAO.getAppPipelines(l);
    }

    public List<AppAggregate> getAppAggregates(Long l) {
        return this.pipelineDAO.aggregateByApp(l);
    }
}
