package io.castled.warehouses.connectors.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.castled.ObjectRegistry;
import io.castled.commons.models.FileStorageNamespace;
import io.castled.constants.ConnectorExecutionConstants;
import io.castled.filemanager.RawFileWriter;
import io.castled.filestorage.GcsClient;
import io.castled.models.QueryMode;
import io.castled.schema.models.FieldSchema;
import io.castled.schema.models.Tuple;
import io.castled.utils.FileUtils;
import io.castled.utils.JsonUtils;
import io.castled.utils.SizeUtils;
import io.castled.warehouses.WarehouseConfig;
import io.castled.warehouses.WarehouseConnectorConfig;
import io.castled.warehouses.WarehouseSyncFailureListener;
import io.castled.warehouses.connectors.bigquery.daos.BQSnapshotTrackerDAO;
import io.castled.warehouses.connectors.bigquery.gcp.GcpClientFactory;
import io.castled.warehouses.models.WarehousePollContext;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/castled/warehouses/connectors/bigquery/BQSyncFailureListener.class */
public class BQSyncFailureListener extends WarehouseSyncFailureListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BQSyncFailureListener.class);
    private final WarehousePollContext warehousePollContext;
    private final RawFileWriter rawFileWriter;
    private final BigQueryWarehouseConfig warehouseConfig;
    private final String gcsUploadDirectory;
    private final GcsClient gcsClient;
    private final String bucket;
    private final List<String> gcsFileUrls;
    private final BQSnapshotTrackerDAO bqSnapshotTrackerDAO;
    private int totalBytes;
    private long failedRecords;

    public BQSyncFailureListener(WarehousePollContext warehousePollContext) {
        super(warehousePollContext);
        this.gcsFileUrls = Lists.newArrayList();
        this.totalBytes = 0;
        this.failedRecords = 0L;
        this.warehousePollContext = warehousePollContext;
        this.rawFileWriter = new RawFileWriter(SizeUtils.convertMBToBytes(50L), this.failureRecordsDirectory, () -> {
            return UUID.randomUUID().toString();
        });
        this.warehouseConfig = (BigQueryWarehouseConfig) warehousePollContext.getWarehouseConfig();
        this.gcsClient = ((GcpClientFactory) ObjectRegistry.getInstance(GcpClientFactory.class)).getGcsClient(this.warehouseConfig.getServiceAccount(), this.warehouseConfig.getProjectId());
        this.gcsUploadDirectory = GcsClient.constructObjectKey(Lists.newArrayList(FileStorageNamespace.PIPELINE_FAILED_RECORDS.getNamespace(), warehousePollContext.getPipelineUUID(), String.valueOf(warehousePollContext.getPipelineRunId())));
        this.bucket = this.warehouseConfig.getBucketName();
        this.bqSnapshotTrackerDAO = (BQSnapshotTrackerDAO) ((Jdbi) ObjectRegistry.getInstance(Jdbi.class)).onDemand(BQSnapshotTrackerDAO.class);
    }

    @Override // io.castled.warehouses.WarehouseSyncFailureListener
    public void cleanupResources(String str, Long l, WarehouseConfig warehouseConfig) {
        BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig) warehouseConfig;
        ((GcpClientFactory) ObjectRegistry.getInstance(GcpClientFactory.class)).getGcsClient(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId()).deleteDirectory(bigQueryWarehouseConfig.getBucketName(), this.gcsUploadDirectory);
        FileUtils.deleteDirectory(this.failureRecordsDirectory);
    }

    @Override // io.castled.warehouses.WarehouseSyncFailureListener
    public void doFlush() throws Exception {
        if (this.warehousePollContext.getQueryMode() == QueryMode.FULL_LOAD) {
            return;
        }
        if (this.totalBytes > 0) {
            this.rawFileWriter.close();
            uploadFilesToGCS();
        }
        BigQuery bigQuery = ((GcpClientFactory) ObjectRegistry.getInstance(GcpClientFactory.class)).getBigQuery(this.warehouseConfig.getServiceAccount(), this.warehouseConfig.getProjectId());
        BQSnapshotTracker snapshotTracker = this.bqSnapshotTrackerDAO.getSnapshotTracker(this.warehousePollContext.getPipelineUUID());
        if (this.failedRecords > 0) {
            removeFailedRecordsFromSnapshot(bigQuery, snapshotTracker);
        }
        commitSnapshot(bigQuery, snapshotTracker);
    }

    private void commitSnapshot(BigQuery bigQuery, BQSnapshotTracker bQSnapshotTracker) {
        String committedSnapshot = bQSnapshotTracker.getCommittedSnapshot();
        this.bqSnapshotTrackerDAO.commitSnapshot(this.warehousePollContext.getPipelineUUID());
        if (committedSnapshot != null) {
            bigQuery.delete(TableId.of("castled", committedSnapshot));
        }
    }

    private void removeFailedRecordsFromSnapshot(BigQuery bigQuery, BQSnapshotTracker bQSnapshotTracker) throws Exception {
        String failedRecordsTable = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        bigQuery.query(QueryJobConfiguration.newBuilder(String.format("select %s from %s.%s limit 0", String.join(",", this.trackableFields), "castled", bQSnapshotTracker.getUncommittedSnapshot())).setDestinationTable(TableId.of("castled", failedRecordsTable)).setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE).mo10531build(), new BigQuery.JobOption[0]);
        BigQueryUtils.runJob(bigQuery.create(JobInfo.newBuilder(LoadJobConfiguration.newBuilder(TableId.of("castled", ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID())), this.gcsFileUrls, FormatOptions.json()).setIgnoreUnknownValues((Boolean) true).mo10531build()).build(), new BigQuery.JobOption[0]));
        this.gcsClient.deleteDirectory(this.bucket, this.gcsUploadDirectory);
        StringBuilder sb = new StringBuilder(String.format("MERGE %s.%s T using %s.%s S on 1 = 1", "castled", bQSnapshotTracker.getUncommittedSnapshot(), "castled", failedRecordsTable));
        for (String str : this.trackableFields) {
            sb.append(String.format(" AND (T.%s = S.%s OR (T.%s IS NULL and S.%s IS NULL))", str, str, str, str));
        }
        sb.append(" WHEN MATCHED THEN DELETE");
        BigQueryUtils.runJob(bigQuery.create(JobInfo.newBuilder(QueryJobConfiguration.newBuilder(sb.toString()).mo10531build()).build(), new BigQuery.JobOption[0]));
        bigQuery.delete(TableId.of("castled", ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID())));
    }

    @Override // io.castled.warehouses.WarehouseSyncFailureListener
    public synchronized void doWriteRecord(Tuple tuple) throws Exception {
        if (this.warehousePollContext.getQueryMode() == QueryMode.FULL_LOAD) {
            return;
        }
        byte[] bytes = getCopyableRecord(tuple).getBytes();
        this.rawFileWriter.writeRecord(bytes);
        this.totalBytes += bytes.length;
        this.failedRecords++;
        if (this.totalBytes > SizeUtils.convertGBToBytes(((WarehouseConnectorConfig) ObjectRegistry.getInstance(WarehouseConnectorConfig.class)).getFailedRecordBufferSize())) {
            this.rawFileWriter.close();
            uploadFilesToGCS();
            if (Files.exists(this.failureRecordsDirectory, new LinkOption[0])) {
                return;
            }
            Files.createDirectory(this.failureRecordsDirectory, new FileAttribute[0]);
        }
    }

    private void uploadFilesToGCS() throws Exception {
        this.gcsFileUrls.addAll(this.gcsClient.uploadDirectory(this.bucket, this.failureRecordsDirectory, this.gcsUploadDirectory));
        FileUtils.deleteDirectory(this.failureRecordsDirectory);
    }

    private String getCopyableRecord(Tuple tuple) {
        BQWarehouseCopyAdaptor bQWarehouseCopyAdaptor = (BQWarehouseCopyAdaptor) ObjectRegistry.getInstance(BQWarehouseCopyAdaptor.class);
        HashMap newHashMap = Maps.newHashMap();
        for (FieldSchema fieldSchema : this.warehousePollContext.getWarehouseSchema().getFieldSchemas()) {
            if (this.trackableFields.contains(fieldSchema.getName())) {
                newHashMap.put(fieldSchema.getName(), bQWarehouseCopyAdaptor.constructSyncableRecord(tuple.getValue(fieldSchema.getName()), fieldSchema.getSchema()));
            }
        }
        return JsonUtils.objectToString(newHashMap);
    }
}
