package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.AbstractRecordsProducerTest;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.data.VerifyRecord;
import io.debezium.relational.TableId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/RecordsStreamProducerIT.class */
public class RecordsStreamProducerIT extends AbstractRecordsProducerTest {
    private RecordsStreamProducer recordsProducer;
    private AbstractRecordsProducerTest.TestConsumer consumer;

    @Before
    public void before() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE SCHEMA public;DROP TABLE IF EXISTS test_table;CREATE TABLE test_table (pk SERIAL, text TEXT, PRIMARY KEY(pk));INSERT INTO test_table(text) VALUES ('insert');");
        PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
        this.recordsProducer = new RecordsStreamProducer(new PostgresTaskContext(postgresConnectorConfig, new PostgresSchema(postgresConnectorConfig)), new SourceInfo(postgresConnectorConfig.serverName()));
    }

    @After
    public void after() throws Exception {
        if (this.recordsProducer != null) {
            this.recordsProducer.stop();
        }
    }

    @Test
    public void shouldReceiveChangesForInsertsWithDifferentDataTypes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer);
        assertInsert("INSERT INTO numeric_table (si, i, bi, d, n, r, db, ss, bs, b) VALUES (1, 123456, 1234567890123, 1.1, 22.22, 3.3, 4.44, 1, 123, true)", schemasAndValuesForNumericType());
        this.consumer.expects(1);
        assertInsert("INSERT INTO string_table (vc, vcv, ch, c, t) VALUES ('aa', 'bb', 'cdef', 'abc', 'some text')", schemasAndValuesForStringTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO cash_table (csh) VALUES ('$1234.11')", schemaAndValuesForMoneyTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO bitbin_table (ba, bol, bs, bv) VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '11'::bit(2), '00'::bit(2))", schemaAndValuesForBinTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO time_table(ts, tz, date, ti, ttz, it) VALUES ('2016-11-04T13:51:30'::TIMESTAMP, '2016-11-04T13:51:30+02:00'::TIMESTAMPTZ, '2016-11-04'::DATE, '13:51:30'::TIME, '13:51:30+02:00'::TIMETZ, 'P1Y2M3DT4H5M0S'::INTERVAL)", schemaAndValuesForDateTimeTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO text_table(j, jb, x, u) VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, '<foo>bar</foo><foo>bar</foo>'::xml, 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::UUID)", schemasAndValuesForTextTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO geom_table(p) VALUES ('(1,1)'::point)", schemaAndValuesForGeomTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO tstzrange_table (unbounded_exclusive_range, bounded_inclusive_range) VALUES ('[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]')", schemaAndValuesForTstzRangeTypes());
    }

    @Test
    public void shouldReceiveChangesForInsertsWithQuotedNames() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer);
        assertInsert("INSERT INTO \"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\" (\"Quoted_\"\" . Text_Column\") VALUES ('some text')", schemasAndValuesForQuotedTypes());
    }

    @Test
    public void shouldReceiveChangesForInsertsWithArrayTypes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer);
        assertInsert("INSERT INTO array_table (int_array, bigint_array, text_array) VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}')", schemasAndValuesForArrayTypes());
    }

    @Test
    public void shouldReceiveChangesForNewTable() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer);
        executeAndWait("CREATE SCHEMA s1;CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (11);");
        assertRecordInserted("s1.a", "pk", 1);
    }

    @Test
    public void shouldReceiveChangesForRenamedTable() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer);
        executeAndWait("DROP TABLE IF EXISTS renamed_test_table;ALTER TABLE test_table RENAME TO renamed_test_table;INSERT INTO renamed_test_table (text) VALUES ('new');");
        assertRecordInserted("public.renamed_test_table", "pk", 2);
    }

    @Test
    public void shouldReceiveChangesForUpdates() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer);
        executeAndWait("UPDATE test_table set text='update' WHERE pk=1");
        SourceRecord remove = this.consumer.remove();
        String str = TestHelper.topicName("public.test_table");
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidUpdate(remove, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update")), remove, "after");
        this.consumer.expects(1);
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL");
        executeAndWait("UPDATE test_table set text='update2' WHERE pk=1");
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidUpdate(remove2, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update")), remove2, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update2")), remove2, "after");
    }

    @Test
    public void shouldReceiveChangesForUpdatesWithColumnChanges() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer);
        executeAndWait("ALTER TABLE test_table ADD COLUMN uvc VARCHAR(2);ALTER TABLE test_table REPLICA IDENTITY FULL;UPDATE test_table SET uvc ='aa' WHERE pk = 1;");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidUpdate(remove, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("uvc", null, null)), remove, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("uvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "aa")), remove, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table RENAME COLUMN uvc to xvc;UPDATE test_table SET xvc ='bb' WHERE pk = 1;");
        SourceRecord remove2 = this.consumer.remove();
        VerifyRecord.isValidUpdate(remove2, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("xvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "aa")), remove2, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("xvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "bb")), remove2, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table DROP COLUMN xvc;UPDATE test_table SET text ='update' WHERE pk = 1;");
        VerifyRecord.isValidUpdate(this.consumer.remove(), "pk", 1);
    }

    @Test
    public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
        this.consumer = testConsumer(3, new String[0]);
        this.recordsProducer.start(this.consumer);
        executeAndWait("UPDATE test_table SET text = 'update', pk = 2");
        String str = TestHelper.topicName("public.test_table");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidTombstone(remove2, "pk", 1);
        SourceRecord remove3 = this.consumer.remove();
        TestCase.assertEquals(str, remove3.topic());
        VerifyRecord.isValidInsert(remove3, "pk", 2);
    }

    @Test
    public void shouldReceiveChangesForDefaultValues() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer);
        executeAndWait("ALTER TABLE test_table REPLICA IDENTITY FULL;ALTER TABLE test_table ADD COLUMN default_column TEXT DEFAULT 'default';INSERT INTO test_table (text) VALUES ('update');");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidInsert(remove, "pk", 2);
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update"), new AbstractRecordsProducerTest.SchemaAndValueField("default_column", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "default")), remove, "after");
    }

    @Test
    public void shouldReceiveChangesForDeletes() throws Exception {
        this.consumer = testConsumer(5, new String[0]);
        this.recordsProducer.start(this.consumer);
        executeAndWait("INSERT INTO test_table (text) VALUES ('insert2');DELETE FROM test_table WHERE pk > 0;");
        String str = TestHelper.topicName("public.test_table");
        assertRecordInserted("public.test_table", "pk", 2);
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidTombstone(remove2, "pk", 1);
        SourceRecord remove3 = this.consumer.remove();
        TestCase.assertEquals(str, remove3.topic());
        VerifyRecord.isValidDelete(remove3, "pk", 2);
        SourceRecord remove4 = this.consumer.remove();
        TestCase.assertEquals(str, remove4.topic());
        VerifyRecord.isValidTombstone(remove4, "pk", 2);
    }

    @Test
    public void shouldReceiveNumericTypeAsDouble() throws Exception {
        PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.DOUBLE).build());
        this.recordsProducer = new RecordsStreamProducer(new PostgresTaskContext(postgresConnectorConfig, new PostgresSchema(postgresConnectorConfig)), new SourceInfo(postgresConnectorConfig.serverName()));
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer);
        List<AbstractRecordsProducerTest.SchemaAndValueField> schemasAndValuesForNumericType = schemasAndValuesForNumericType();
        schemasAndValuesForNumericType.set(3, new AbstractRecordsProducerTest.SchemaAndValueField("d", Schema.OPTIONAL_FLOAT64_SCHEMA, Double.valueOf(1.1d)));
        schemasAndValuesForNumericType.set(4, new AbstractRecordsProducerTest.SchemaAndValueField("n", Schema.OPTIONAL_FLOAT64_SCHEMA, Double.valueOf(22.22d)));
        assertInsert("INSERT INTO numeric_table (si, i, bi, d, n, r, db, ss, bs, b) VALUES (1, 123456, 1234567890123, 1.1, 22.22, 3.3, 4.44, 1, 123, true)", schemasAndValuesForNumericType);
    }

    private void assertInsert(String str, List<AbstractRecordsProducerTest.SchemaAndValueField> list) {
        TableId tableIdFromInsertStmt = tableIdFromInsertStmt(str);
        String str2 = tableIdFromInsertStmt.schema() + "." + tableIdFromInsertStmt.table();
        try {
            executeAndWait(str);
            SourceRecord assertRecordInserted = assertRecordInserted(str2, "pk", 1);
            assertRecordOffset(assertRecordInserted, false, false);
            assertRecordSchemaAndValues(list, assertRecordInserted, "after");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private SourceRecord assertRecordInserted(String str, String str2, int i) throws InterruptedException {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName(str), remove.topic());
        VerifyRecord.isValidInsert(remove, str2, i);
        return remove;
    }

    private void executeAndWait(String str) throws Exception {
        TestHelper.execute(str);
        this.consumer.await(2L, TimeUnit.SECONDS);
    }
}
