package io.castled.warehouses.connectors.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.storage.Blob;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.castled.ObjectRegistry;
import io.castled.commons.models.FileFormat;
import io.castled.commons.models.FileStorageNamespace;
import io.castled.commons.streams.GcsFilesRecordInputStream;
import io.castled.commons.streams.RecordInputStream;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.filestorage.GcsClient;
import io.castled.models.QueryMode;
import io.castled.schema.models.RecordSchema;
import io.castled.utils.FileUtils;
import io.castled.warehouses.WarehouseConfig;
import io.castled.warehouses.WarehouseDataPoller;
import io.castled.warehouses.connectors.bigquery.daos.BQSnapshotTrackerDAO;
import io.castled.warehouses.connectors.bigquery.gcp.GcpClientFactory;
import io.castled.warehouses.models.WarehousePollContext;
import io.castled.warehouses.models.WarehousePollResult;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.collections4.CollectionUtils;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/castled/warehouses/connectors/bigquery/BigQueryDataPoller.class */
public class BigQueryDataPoller implements WarehouseDataPoller {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BigQueryDataPoller.class);
    private final GcpClientFactory gcpClientFactory;
    private final BigQueryConnector bigQueryConnector;
    private final BigQueryExportJsonSchemaMapper bigQueryExportJsonSchemaMapper;
    private final BQSnapshotTrackerDAO bqSnapshotTrackerDAO;

    @Inject
    public BigQueryDataPoller(GcpClientFactory gcpClientFactory, BigQueryConnector bigQueryConnector, BigQueryExportJsonSchemaMapper bigQueryExportJsonSchemaMapper, Jdbi jdbi) {
        this.gcpClientFactory = gcpClientFactory;
        this.bigQueryConnector = bigQueryConnector;
        this.bigQueryExportJsonSchemaMapper = bigQueryExportJsonSchemaMapper;
        this.bqSnapshotTrackerDAO = (BQSnapshotTrackerDAO) jdbi.onDemand(BQSnapshotTrackerDAO.class);
    }

    @Override // io.castled.warehouses.WarehouseDataPoller
    public WarehousePollResult pollRecords(WarehousePollContext warehousePollContext) {
        try {
            BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig) warehousePollContext.getWarehouseConfig();
            BigQuery bigQuery = this.gcpClientFactory.getBigQuery(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
            if (warehousePollContext.getQueryMode() == QueryMode.FULL_LOAD) {
                return doFullLoad(bigQuery, warehousePollContext);
            }
            List<String> listTables = BigQueryUtils.listTables("castled", bigQuery);
            if (CollectionUtils.isEmpty(listTables)) {
                BigQueryUtils.getOrCreateDataset("castled", bigQuery, bigQueryWarehouseConfig.getLocation());
            }
            BQSnapshotTracker orCreateSnapshotTracker = getOrCreateSnapshotTracker(warehousePollContext.getPipelineUUID());
            dropOrphanedTables(listTables, orCreateSnapshotTracker, warehousePollContext.getPipelineUUID(), bigQuery);
            String createUncommittedSnapshot = createUncommittedSnapshot(bigQuery, warehousePollContext);
            orCreateSnapshotTracker.setUncommittedSnapshot(createUncommittedSnapshot);
            RecordSchema querySchema = this.bigQueryConnector.getQuerySchema(bigQueryWarehouseConfig, String.format("select * from %s.%s", "castled", createUncommittedSnapshot));
            return WarehousePollResult.builder().recordInputStream(createRecordStream(bigQuery, bigQueryWarehouseConfig, warehousePollContext, getDataFetchQuery(listTables, orCreateSnapshotTracker), querySchema)).warehouseSchema(querySchema).build();
        } catch (Exception e) {
            log.error("Data poll failed for pipeline {} and pipeline run {}", warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId());
            throw new CastledRuntimeException(e);
        }
    }

    public WarehousePollResult doFullLoad(BigQuery bigQuery, WarehousePollContext warehousePollContext) throws Exception {
        BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig) warehousePollContext.getWarehouseConfig();
        RecordSchema querySchema = this.bigQueryConnector.getQuerySchema(bigQueryWarehouseConfig, warehousePollContext.getQuery());
        return WarehousePollResult.builder().recordInputStream(createRecordStream(bigQuery, bigQueryWarehouseConfig, warehousePollContext, warehousePollContext.getQuery(), querySchema)).warehouseSchema(querySchema).build();
    }

    @Override // io.castled.warehouses.WarehouseDataPoller
    public WarehousePollResult resumePoll(WarehousePollContext warehousePollContext) {
        try {
            BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig) warehousePollContext.getWarehouseConfig();
            GcsClient gcsClient = this.gcpClientFactory.getGcsClient(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
            List<Blob> listObjects = gcsClient.listObjects(bigQueryWarehouseConfig.getBucketName(), getPipelineRunGcsUnloadDir(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()));
            if (CollectionUtils.isEmpty(listObjects)) {
                return pollRecords(warehousePollContext);
            }
            RecordSchema querySchema = this.bigQueryConnector.getQuerySchema(bigQueryWarehouseConfig, warehousePollContext.getQueryMode() == QueryMode.FULL_LOAD ? warehousePollContext.getQuery() : String.format("select * from %s.%s", "castled", this.bqSnapshotTrackerDAO.getSnapshotTracker(warehousePollContext.getPipelineUUID()).getUncommittedSnapshot()));
            return WarehousePollResult.builder().recordInputStream(new GcsFilesRecordInputStream(querySchema, this.bigQueryExportJsonSchemaMapper, listObjects, FileFormat.JSON, getPipelineRunUnloadDirectory(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()), gcsClient, 20, true)).warehouseSchema(querySchema).resumed(true).build();
        } catch (Exception e) {
            log.error("Data poll resume failed for pipeline {} and pipeline run {}", warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId());
            return pollRecords(warehousePollContext);
        }
    }

    private void dropOrphanedTables(List<String> list, BQSnapshotTracker bQSnapshotTracker, String str, BigQuery bigQuery) {
        for (String str2 : list) {
            if (str2.startsWith(str) && !str2.equals(bQSnapshotTracker.getCommittedSnapshot())) {
                bigQuery.delete(TableId.of("castled", str2));
            }
        }
    }

    private BQSnapshotTracker getOrCreateSnapshotTracker(String str) {
        BQSnapshotTracker snapshotTracker = this.bqSnapshotTrackerDAO.getSnapshotTracker(str);
        if (snapshotTracker != null) {
            return snapshotTracker;
        }
        return BQSnapshotTracker.builder().id(Long.valueOf(this.bqSnapshotTrackerDAO.createPipelineSnapshot(str, null, null))).pipelineUUID(str).build();
    }

    private String createUncommittedSnapshot(BigQuery bigQuery, WarehousePollContext warehousePollContext) throws Exception {
        TableId of = TableId.of("castled", String.format("%s_snapshot_%d", warehousePollContext.getPipelineUUID(), Long.valueOf(System.currentTimeMillis())));
        bigQuery.query(QueryJobConfiguration.newBuilder(warehousePollContext.getQuery()).setDestinationTable(of).setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE).mo10531build(), new BigQuery.JobOption[0]);
        this.bqSnapshotTrackerDAO.updateUncommittedSnapshot(warehousePollContext.getPipelineUUID(), of.getTable());
        return of.getTable();
    }

    private RecordInputStream createRecordStream(BigQuery bigQuery, BigQueryWarehouseConfig bigQueryWarehouseConfig, WarehousePollContext warehousePollContext, String str, RecordSchema recordSchema) throws Exception {
        GcsClient gcsClient = this.gcpClientFactory.getGcsClient(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
        BigQueryUtils.runJob(bigQuery.create(JobInfo.newBuilder(QueryJobConfiguration.newBuilder(String.format("EXPORT DATA OPTIONS (uri = '%s',compression = 'GZIP', format='JSON', overwrite=true) AS %s", GcsClient.constructGcsPath(bigQueryWarehouseConfig.getBucketName(), Lists.newArrayList(FileStorageNamespace.PIPELINE_UNLOADS.getNamespace(), warehousePollContext.getPipelineUUID(), String.valueOf(warehousePollContext.getPipelineRunId()), "*.json.gz")), str)).mo10531build()).build(), new BigQuery.JobOption[0]));
        return new GcsFilesRecordInputStream(recordSchema, this.bigQueryExportJsonSchemaMapper, gcsClient.listObjects(bigQueryWarehouseConfig.getBucketName(), getPipelineRunGcsUnloadDir(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId())), FileFormat.JSON, getPipelineRunUnloadDirectory(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()), gcsClient, 20, true);
    }

    private String getDataFetchQuery(List<String> list, BQSnapshotTracker bQSnapshotTracker) {
        String committedSnapshot = bQSnapshotTracker.getCommittedSnapshot();
        String uncommittedSnapshot = bQSnapshotTracker.getUncommittedSnapshot();
        Optional ofNullable = Optional.ofNullable(committedSnapshot);
        Objects.requireNonNull(list);
        return ofNullable.filter((v1) -> {
            return r1.contains(v1);
        }).isPresent() ? String.format("select * from %s.%s except distinct select * from %s.%s", "castled", uncommittedSnapshot, "castled", committedSnapshot) : String.format("select * from %s.%s", "castled", uncommittedSnapshot);
    }

    @Override // io.castled.warehouses.WarehouseDataPoller
    public void cleanupPipelineRunResources(WarehousePollContext warehousePollContext) {
        BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig) warehousePollContext.getWarehouseConfig();
        ((GcpClientFactory) ObjectRegistry.getInstance(GcpClientFactory.class)).getGcsClient(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId()).deleteDirectory(bigQueryWarehouseConfig.getBucketName(), getPipelineRunGcsUnloadDir(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()));
        FileUtils.deleteDirectory(getPipelineRunUnloadDirectory(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()));
    }

    public String getPipelineGcsUnloadDir(String str) {
        return GcsClient.constructObjectKey(Lists.newArrayList(FileStorageNamespace.PIPELINE_UNLOADS.getNamespace(), str));
    }

    public String getPipelineRunGcsUnloadDir(String str, Long l) {
        return GcsClient.constructObjectKey(Lists.newArrayList(FileStorageNamespace.PIPELINE_UNLOADS.getNamespace(), str, String.valueOf(l)));
    }

    @Override // io.castled.warehouses.WarehouseDataPoller
    public void cleanupPipelineResources(String str, WarehouseConfig warehouseConfig) {
        BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig) warehouseConfig;
        ((GcpClientFactory) ObjectRegistry.getInstance(GcpClientFactory.class)).getGcsClient(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId()).deleteDirectory(bigQueryWarehouseConfig.getBucketName(), getPipelineGcsUnloadDir(str));
        BigQuery bigQuery = ((GcpClientFactory) ObjectRegistry.getInstance(GcpClientFactory.class)).getBigQuery(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
        Iterator<String> it = BigQueryUtils.listTables("castled", bigQuery).iterator();
        while (it.hasNext()) {
            bigQuery.delete(TableId.of("castled", it.next()));
        }
    }
}
