package io.castled.warehouses.connectors.postgres;

import com.google.common.collect.Lists;
import io.castled.ObjectRegistry;
import io.castled.constants.ConnectorExecutionConstants;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.models.Field;
import io.castled.schema.models.SchemaType;
import io.castled.schema.models.Tuple;
import io.castled.utils.StringUtils;
import io.castled.warehouses.WarehouseConfig;
import io.castled.warehouses.WarehouseSyncFailureListener;
import io.castled.warehouses.models.WarehousePollContext;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/castled/warehouses/connectors/postgres/PostgresSyncFailureListener.class */
public class PostgresSyncFailureListener extends WarehouseSyncFailureListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PostgresSyncFailureListener.class);
    private static final int MAX_BUFFERED_RECORDS = 100;
    private final List<Tuple> bufferedRecords;
    private final Connection connection;
    private long failedRecords;
    private boolean failedRecordsTableCreated;

    public PostgresSyncFailureListener(WarehousePollContext warehousePollContext) throws Exception {
        super(warehousePollContext);
        this.bufferedRecords = Lists.newArrayList();
        this.failedRecords = 0L;
        this.failedRecordsTableCreated = false;
        this.connection = ((PostgresWarehouseConnector) ObjectRegistry.getInstance(PostgresWarehouseConnector.class)).getConnection((PostgresWarehouseConfig) warehousePollContext.getWarehouseConfig());
    }

    @Override // io.castled.warehouses.WarehouseSyncFailureListener
    public synchronized void doWriteRecord(Tuple tuple) throws Exception {
        this.bufferedRecords.add(tuple);
        this.failedRecords++;
        if (this.bufferedRecords.size() >= 100) {
            insertBufferedRecords(this.bufferedRecords);
            this.bufferedRecords.clear();
        }
    }

    private void commitSnapshot() throws SQLException {
        this.connection.setAutoCommit(false);
        boolean autoCommit = this.connection.getAutoCommit();
        if (autoCommit) {
            this.connection.setAutoCommit(false);
        }
        String qualifiedUncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        String qualifiedCommittedSnapshot = ConnectorExecutionConstants.getQualifiedCommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        try {
            Statement createStatement = this.connection.createStatement();
            try {
                createStatement.execute(String.format("drop table if exists %s", qualifiedCommittedSnapshot));
                createStatement.execute(String.format("alter table %s rename to %s", qualifiedUncommittedSnapshot, ConnectorExecutionConstants.getCommittedSnapshot(this.warehousePollContext.getPipelineUUID())));
                if (createStatement != null) {
                    createStatement.close();
                }
                this.connection.commit();
                if (autoCommit) {
                    this.connection.setAutoCommit(true);
                }
            } finally {
            }
        } catch (Exception e) {
            this.connection.rollback();
            log.error("Committing snapshot for pipeline {} failed", this.warehousePollContext.getPipelineUUID(), e);
            throw new CastledRuntimeException(e);
        }
    }

    private void createFailedRecordsTable() throws SQLException {
        ((PostgresClient) ObjectRegistry.getInstance(PostgresClient.class)).createTableFromQuery(this.connection, ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID()), String.format("select %s from %s limit 0", String.join(",", this.trackableFields), ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID())), true, (PostgresTableProperties) ((PostgresWarehouseConnector) ObjectRegistry.getInstance(PostgresWarehouseConnector.class)).getSnapshotTableProperties(this.warehousePollContext.getPrimaryKeys()));
    }

    @Override // io.castled.warehouses.WarehouseSyncFailureListener
    public void cleanupResources(String str, Long l, WarehouseConfig warehouseConfig) {
        try {
            if (!this.connection.isClosed()) {
                this.connection.close();
            }
        } catch (Exception e) {
            log.error("Failed to close postgres connection for pipeline {}", str);
        }
    }

    @Override // io.castled.warehouses.WarehouseSyncFailureListener
    public void doFlush() throws Exception {
        if (this.bufferedRecords.size() > 0) {
            insertBufferedRecords(this.bufferedRecords);
        }
        if (this.failedRecords > 0) {
            removeFailedRecordsFromSnapshot();
        }
        commitSnapshot();
        this.connection.close();
    }

    private void insertBufferedRecords(List<Tuple> list) throws SQLException {
        if (!this.failedRecordsTableCreated) {
            createFailedRecordsTable();
            this.failedRecordsTableCreated = true;
        }
        String failedRecordsTable = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        Statement createStatement = this.connection.createStatement();
        try {
            for (Tuple tuple : list) {
                createStatement.execute(String.format("insert into %s(", failedRecordsTable) + ((String) tuple.getFields().stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.joining(","))) + ") values(" + ((String) tuple.getFields().stream().map(this::getQueryValue).collect(Collectors.joining(","))) + ")");
            }
            createStatement.executeBatch();
            if (createStatement != null) {
                createStatement.close();
            }
        } catch (Throwable th) {
            if (createStatement != null) {
                try {
                    createStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private String getQueryValue(Field field) {
        return field.getValue() == null ? "null" : field.getSchema().getType().equals(SchemaType.STRING) ? StringUtils.singleQuote((String) field.getValue()) : field.getValue().toString();
    }

    private void removeFailedRecordsFromSnapshot() throws SQLException {
        String qualifiedUncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        String failedRecordsTable = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        StringBuilder sb = new StringBuilder(String.format("delete from %s using %s where 1 = 1", qualifiedUncommittedSnapshot, failedRecordsTable));
        for (String str : this.trackableFields) {
            sb.append(String.format(" AND (%s.%s = %s.%s)", failedRecordsTable, str, qualifiedUncommittedSnapshot, str));
        }
        Statement createStatement = this.connection.createStatement();
        try {
            createStatement.execute(sb.toString());
            if (createStatement != null) {
                createStatement.close();
            }
        } catch (Throwable th) {
            if (createStatement != null) {
                try {
                    createStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
