package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.AbstractRecordsProducerTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/SnapshotWithOverridesProducerIT.class */
public class SnapshotWithOverridesProducerIT extends AbstractRecordsProducerTest {
    private static final String STATEMENTS = "CREATE SCHEMA over;CREATE TABLE over.t1 (pk INT, PRIMARY KEY(pk));CREATE TABLE over.t2 (pk INT, PRIMARY KEY(pk));INSERT INTO over.t1 VALUES (1);INSERT INTO over.t1 VALUES (2);INSERT INTO over.t1 VALUES (3);INSERT INTO over.t1 VALUES (101);INSERT INTO over.t1 VALUES (102);INSERT INTO over.t1 VALUES (103);INSERT INTO over.t2 VALUES (1);INSERT INTO over.t2 VALUES (2);INSERT INTO over.t2 VALUES (3);INSERT INTO over.t2 VALUES (101);INSERT INTO over.t2 VALUES (102);INSERT INTO over.t2 VALUES (103);";
    private RecordsSnapshotProducer snapshotProducer;
    private PostgresTaskContext context;

    public void before(Configuration configuration) throws SQLException {
        TestHelper.dropAllSchemas();
        PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(TestHelper.defaultConfig().with(configuration).build());
        this.context = new PostgresTaskContext(postgresConnectorConfig, new PostgresSchema(postgresConnectorConfig));
    }

    @After
    public void after() throws Exception {
        if (this.snapshotProducer != null) {
            this.snapshotProducer.stop();
        }
    }

    @Test
    public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws Exception {
        before(Configuration.create().with(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, "over.t1").with(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + ".over.t1", "SELECT * FROM over.t1 WHERE pk > 100").build());
        this.snapshotProducer = new RecordsSnapshotProducer(this.context, new SourceInfo("test_server"), false);
        TestHelper.execute(STATEMENTS);
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(9, "over");
        this.snapshotProducer.start(testConsumer);
        testConsumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        Map<String, List<SourceRecord>> recordsByTopic = recordsByTopic(9, testConsumer);
        Assertions.assertThat(recordsByTopic.get("test_server.over.t1")).hasSize(3);
        Assertions.assertThat(recordsByTopic.get("test_server.over.t2")).hasSize(6);
    }

    @Test
    public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() throws Exception {
        before(Configuration.create().with(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, "over.t1,over.t2").with(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + ".over.t1", "SELECT * FROM over.t1 WHERE pk > 101").with(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + ".over.t2", "SELECT * FROM over.t2 WHERE pk > 100").build());
        this.snapshotProducer = new RecordsSnapshotProducer(this.context, new SourceInfo("test_server"), false);
        TestHelper.execute(STATEMENTS);
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(5, "over");
        this.snapshotProducer.start(testConsumer);
        testConsumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        Map<String, List<SourceRecord>> recordsByTopic = recordsByTopic(5, testConsumer);
        Assertions.assertThat(recordsByTopic.get("test_server.over.t1")).hasSize(2);
        Assertions.assertThat(recordsByTopic.get("test_server.over.t2")).hasSize(3);
    }

    private Map<String, List<SourceRecord>> recordsByTopic(int i, AbstractRecordsProducerTest.TestConsumer testConsumer) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            SourceRecord remove = testConsumer.remove();
            hashMap.putIfAbsent(remove.topic(), new ArrayList());
            hashMap.compute(remove.topic(), (str, list) -> {
                list.add(remove);
                return list;
            });
        }
        return hashMap;
    }
}
