package io.castled.warehouses.connectors.postgres;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.castled.ObjectRegistry;
import io.castled.commons.streams.JdbcRecordInputStream;
import io.castled.commons.streams.RecordInputStream;
import io.castled.constants.ConnectorExecutionConstants;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.models.RecordSchema;
import io.castled.warehouses.WarehouseConfig;
import io.castled.warehouses.WarehouseDataPoller;
import io.castled.warehouses.models.WarehousePollContext;
import io.castled.warehouses.models.WarehousePollResult;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/castled/warehouses/connectors/postgres/PostgresDataPoller.class */
public class PostgresDataPoller implements WarehouseDataPoller {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PostgresDataPoller.class);
    private final PostgresClient postgresClient;
    private final PostgresResultSetSchemaMapper resultSetSchemaMapper;
    private final PostgresWarehouseConnector postgresWarehouseConnector;

    @Inject
    public PostgresDataPoller(PostgresClient postgresClient, PostgresResultSetSchemaMapper postgresResultSetSchemaMapper, PostgresWarehouseConnector postgresWarehouseConnector) {
        this.postgresClient = postgresClient;
        this.resultSetSchemaMapper = postgresResultSetSchemaMapper;
        this.postgresWarehouseConnector = postgresWarehouseConnector;
    }

    @Override // io.castled.warehouses.WarehouseDataPoller
    public WarehousePollResult pollRecords(WarehousePollContext warehousePollContext) {
        try {
            Connection connection = this.postgresWarehouseConnector.getConnection((PostgresWarehouseConfig) warehousePollContext.getWarehouseConfig());
            try {
                List<String> listTables = this.postgresClient.listTables(connection, "castled");
                createUncommittedSnapshot(connection, warehousePollContext, listTables);
                RecordSchema schemaFromQuery = getSchemaFromQuery(connection, String.format("select * from %s", ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollContext.getPipelineUUID())));
                WarehousePollResult build = WarehousePollResult.builder().recordInputStream(createRecordStream(connection, warehousePollContext, listTables, schemaFromQuery)).warehouseSchema(schemaFromQuery).build();
                if (connection != null) {
                    connection.close();
                }
                return build;
            } finally {
            }
        } catch (Exception e) {
            log.error("Poll records from warehouse {} failed", warehousePollContext.getWarehouseConfig().getType(), e);
            throw new CastledRuntimeException(e);
        }
    }

    @Override // io.castled.warehouses.WarehouseDataPoller
    public WarehousePollResult resumePoll(WarehousePollContext warehousePollContext) {
        try {
            Connection connection = this.postgresWarehouseConnector.getConnection((PostgresWarehouseConfig) warehousePollContext.getWarehouseConfig());
            try {
                List<String> listTables = this.postgresClient.listTables(connection, "castled");
                if (!listTables.contains(ConnectorExecutionConstants.getUncommittedSnapshot(warehousePollContext.getPipelineUUID()))) {
                    WarehousePollResult pollRecords = pollRecords(warehousePollContext);
                    if (connection != null) {
                        connection.close();
                    }
                    return pollRecords;
                }
                RecordSchema schemaFromQuery = getSchemaFromQuery(connection, String.format("select * from %s", ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollContext.getPipelineUUID())));
                WarehousePollResult build = WarehousePollResult.builder().recordInputStream(createRecordStream(connection, warehousePollContext, listTables, schemaFromQuery)).warehouseSchema(schemaFromQuery).resumed(true).build();
                if (connection != null) {
                    connection.close();
                }
                return build;
            } finally {
            }
        } catch (Exception e) {
            log.error("Resume poll from warehouse {} failed", warehousePollContext.getWarehouseConfig().getType(), e);
            return pollRecords(warehousePollContext);
        }
    }

    private RecordInputStream createRecordStream(Connection connection, WarehousePollContext warehousePollContext, List<String> list, RecordSchema recordSchema) {
        return new JdbcRecordInputStream(connection, getDataFetchQuery(warehousePollContext, list), recordSchema);
    }

    private String getDataFetchQuery(WarehousePollContext warehousePollContext, List<String> list) {
        String qualifiedCommittedSnapshot = ConnectorExecutionConstants.getQualifiedCommittedSnapshot(warehousePollContext.getPipelineUUID());
        String qualifiedUncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollContext.getPipelineUUID());
        return list.contains(ConnectorExecutionConstants.getCommittedSnapshot(warehousePollContext.getPipelineUUID())) ? String.format("select * from %s except select * from %s", qualifiedUncommittedSnapshot, qualifiedCommittedSnapshot) : String.format("select * from %s", qualifiedUncommittedSnapshot);
    }

    private void createUncommittedSnapshot(Connection connection, WarehousePollContext warehousePollContext, List<String> list) throws SQLException {
        if (list.contains(ConnectorExecutionConstants.getUncommittedSnapshot(warehousePollContext.getPipelineUUID()))) {
            Statement createStatement = connection.createStatement();
            try {
                createStatement.execute(String.format("drop table if exists %s", ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollContext.getPipelineUUID())));
                if (createStatement != null) {
                    createStatement.close();
                }
            } catch (Throwable th) {
                if (createStatement != null) {
                    try {
                        createStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        this.postgresClient.createTableFromQuery(connection, ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollContext.getPipelineUUID()), warehousePollContext.getQuery(), false, (PostgresTableProperties) ((PostgresWarehouseConnector) ObjectRegistry.getInstance(PostgresWarehouseConnector.class)).getSnapshotTableProperties(warehousePollContext.getPrimaryKeys()));
    }

    private RecordSchema getSchemaFromQuery(Connection connection, String str) throws SQLException {
        PreparedStatement prepareStatement = connection.prepareStatement(str);
        try {
            RecordSchema schema = this.resultSetSchemaMapper.getSchema(prepareStatement.getMetaData());
            if (prepareStatement != null) {
                prepareStatement.close();
            }
            return schema;
        } catch (Throwable th) {
            if (prepareStatement != null) {
                try {
                    prepareStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.castled.warehouses.WarehouseDataPoller
    public void cleanupPipelineRunResources(WarehousePollContext warehousePollContext) {
    }

    @Override // io.castled.warehouses.WarehouseDataPoller
    public void cleanupPipelineResources(String str, WarehouseConfig warehouseConfig) {
        try {
            Connection connection = ((PostgresWarehouseConnector) ObjectRegistry.getInstance(PostgresWarehouseConnector.class)).getConnection((PostgresWarehouseConfig) warehouseConfig);
            try {
                Statement createStatement = connection.createStatement();
                try {
                    createStatement.execute(String.format("drop table if exists %s", ConnectorExecutionConstants.getQualifiedCommittedSnapshot(str)));
                    createStatement.execute(String.format("drop table if exists %s", ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(str)));
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Throwable th) {
                    if (createStatement != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (SQLException e) {
            log.error("Cleanup pipeline resources failed for pipeline {}", str);
            throw new CastledRuntimeException(e);
        }
    }
}
