package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorIT.class */
public class MySqlConnectorIT extends AbstractConnectorTest {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "connector_test").withDbHistoryPath(DB_HISTORY_PATH);
    private final UniqueDatabase RO_DATABASE = new UniqueDatabase("myServer2", "connector_test_ro", this.DATABASE).withDbHistoryPath(DB_HISTORY_PATH);
    private static final int PRODUCTS_TABLE_EVENT_COUNT = 9;
    private static final int ORDERS_TABLE_EVENT_COUNT = 5;
    private static final int INITIAL_EVENT_COUNT = 33;
    private Configuration config;

    /* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorIT$BinlogPosition.class */
    protected static class BinlogPosition {
        private String binlogFilename;
        private long binlogPosition;
        private String gtidSet;

        protected BinlogPosition() {
        }

        public void readFromDatabase(ResultSet resultSet) throws SQLException {
            if (resultSet.next()) {
                this.binlogFilename = resultSet.getString(1);
                this.binlogPosition = resultSet.getLong(2);
                if (resultSet.getMetaData().getColumnCount() > 4) {
                    this.gtidSet = resultSet.getString(MySqlConnectorIT.ORDERS_TABLE_EVENT_COUNT);
                }
            }
        }

        public String binlogFilename() {
            return this.binlogFilename;
        }

        public long binlogPosition() {
            return this.binlogPosition;
        }

        public String gtidSet() {
            return this.gtidSet;
        }

        public boolean hasGtids() {
            return this.gtidSet != null;
        }

        public String toString() {
            return "file=" + this.binlogFilename + ", pos=" + this.binlogPosition + ", gtids=" + (this.gtidSet != null ? this.gtidSet : "");
        }
    }

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        this.RO_DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(DB_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(DB_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    public void shouldNotStartWithInvalidConfiguration() {
        this.config = Configuration.create().with(MySqlConnectorConfig.SERVER_NAME, "myserver").with(KafkaDatabaseHistory.TOPIC, "myserver").with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        this.logger.info("Attempting to start the connector with an INVALID configuration, so MULTIPLE error messages and exceptions will appear in the log");
        start(MySqlConnector.class, this.config, (z, str, th) -> {
            Assertions.assertThat(z).isFalse();
            Assertions.assertThat(th).isNotNull();
        });
        assertConnectorNotRunning();
    }

    @Test
    public void shouldFailToValidateInvalidConfiguration() {
        Config validate = new MySqlConnector().validate(Configuration.create().with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build().asMap());
        assertConfigurationErrors(validate, MySqlConnectorConfig.HOSTNAME, 1);
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PORT});
        assertConfigurationErrors(validate, MySqlConnectorConfig.USER, 1);
        assertConfigurationErrors(validate, MySqlConnectorConfig.SERVER_NAME, 2);
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_ID});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.COLUMN_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_HISTORY});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_NEW_TABLES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DECIMAL_HANDLING_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TIME_PRECISION_MODE});
        assertConfigurationErrors(validate, KafkaDatabaseHistory.BOOTSTRAP_SERVERS);
        assertConfigurationErrors(validate, KafkaDatabaseHistory.TOPIC);
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS});
    }

    @Test
    public void shouldValidateValidConfigurationWithSSL() {
        Config validate = new MySqlConnector().validate(this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.REQUIRED).with(MySqlConnectorConfig.SSL_KEYSTORE, "/some/path/to/keystore").with(MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, "keystore1234").with(MySqlConnectorConfig.SSL_TRUSTSTORE, "/some/path/to/truststore").with(MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, "truststore1234").with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.SERVER_NAME, "myServer").with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com").with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build().asMap());
        assertConfigurationErrors(validate, MySqlConnectorConfig.HOSTNAME, 0, 1);
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PORT});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.USER});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_NAME});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_ID});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.COLUMN_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_HISTORY});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_NEW_TABLES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DECIMAL_HANDLING_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TIME_PRECISION_MODE});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.BOOTSTRAP_SERVERS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.TOPIC});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS});
    }

    @Test
    public void shouldValidateAcceptableConfiguration() {
        Config validate = new MySqlConnector().validate(this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED).with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.SERVER_NAME, "myServer").with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com").with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.ON_CONNECT_STATEMENTS, "SET SESSION wait_timeout=2000").build().asMap());
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.HOSTNAME});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PORT});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.USER});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.ON_CONNECT_STATEMENTS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_NAME});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SERVER_ID});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TABLE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.COLUMN_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MSG_KEY_COLUMNS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DATABASE_HISTORY});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SNAPSHOT_NEW_TABLES});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.DECIMAL_HANDLING_MODE});
        assertNoConfigurationErrors(validate, new Field[]{MySqlConnectorConfig.TIME_PRECISION_MODE});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.BOOTSTRAP_SERVERS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.TOPIC});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS});
        assertNoConfigurationErrors(validate, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS});
    }

    @Test
    @FixFor({"DBZ-639"})
    public void shouldValidateLockingModeNoneWithValidSnapshotModeConfiguration() {
        Iterator it = ((List) Arrays.stream(MySqlConnectorConfig.SnapshotMode.values()).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList())).iterator();
        while (it.hasNext()) {
            Configuration build = this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED).with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.SERVER_NAME, "myServer").with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com").with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, MySqlConnectorConfig.SnapshotLockingMode.NONE.getValue()).with(MySqlConnectorConfig.SNAPSHOT_MODE, (String) it.next()).build();
            assertNoConfigurationErrors(new MySqlConnector().validate(build.asMap()), new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
            Assertions.assertThat(new MySqlConnectorConfig(build).getSnapshotLockingMode()).isEqualTo(MySqlConnectorConfig.SnapshotLockingMode.NONE);
        }
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
        JdbcConnection connect;
        Throwable th;
        MySQLConnection forTestDatabase;
        Throwable th2;
        boolean equals = System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"));
        if (!equals) {
            Thread.sleep(5000L);
        }
        this.config = Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName()).with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_WHITELIST, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(39);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(12);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.databaseNames().size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("readbinlog_test")).isNull();
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase("").size()).isEqualTo(1);
        consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).forEach(this::print);
        consumeRecordsByTopic.forEach(this::validate);
        List allRecordsInOrder = consumeRecordsByTopic.allRecordsInOrder();
        SourceRecord sourceRecord = (SourceRecord) allRecordsInOrder.get(allRecordsInOrder.size() - 1);
        SourceRecord sourceRecord2 = (SourceRecord) allRecordsInOrder.get(allRecordsInOrder.size() - 2);
        Assertions.assertThat(sourceRecord2.sourceOffset().containsKey("snapshot")).isTrue();
        Assertions.assertThat(sourceRecord.sourceOffset().containsKey("snapshot")).isFalse();
        Assertions.assertThat(((Struct) sourceRecord2.value()).getStruct("source").getString("snapshot")).isEqualTo("true");
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("snapshot")).isEqualTo("last");
        waitForAvailableRecords(3L, TimeUnit.SECONDS);
        System.out.println("TOTAL CONSUMED = " + consumeAvailableRecords(this::print));
        stopConnector();
        MySQLConnection forTestDatabase2 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th3 = null;
        try {
            JdbcConnection connect2 = forTestDatabase2.connect();
            Throwable th4 = null;
            try {
                try {
                    connect2.query("SELECT * FROM products", resultSet -> {
                        if (Testing.Print.isEnabled()) {
                            connect2.print(resultSet);
                        }
                    });
                    connect2.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304);"});
                    connect2.query("SELECT * FROM products", resultSet2 -> {
                        if (Testing.Print.isEnabled()) {
                            connect2.print(resultSet2);
                        }
                    });
                    if (connect2 != null) {
                        if (0 != 0) {
                            try {
                                connect2.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                        } else {
                            connect2.close();
                        }
                    }
                    Testing.print("*** Restarting connector after inserts were made");
                    start(MySqlConnector.class, this.config);
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
                    Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                    Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
                    assertInsert((SourceRecord) consumeRecordsByTopic2.recordsForTopic(this.DATABASE.topicForTable("products")).get(0), "id", 110);
                    Testing.print("*** Done with inserts and restart");
                    Testing.print("*** Stopping connector");
                    stopConnector();
                    Testing.print("*** Restarting connector");
                    start(MySqlConnector.class, this.config);
                    MySQLConnection forTestDatabase3 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                    Throwable th6 = null;
                    try {
                        connect = forTestDatabase3.connect();
                        th = null;
                    } finally {
                        if (forTestDatabase3 != null) {
                            if (0 != 0) {
                                try {
                                    forTestDatabase3.close();
                                } catch (Throwable th7) {
                                    th6.addSuppressed(th7);
                                }
                            } else {
                                forTestDatabase3.close();
                            }
                        }
                    }
                } finally {
                }
                try {
                    connect.execute(new String[]{"INSERT INTO products VALUES (1001,'roy','old robot',1234.56);"});
                    connect.query("SELECT * FROM products", resultSet3 -> {
                        if (Testing.Print.isEnabled()) {
                            connect.print(resultSet3);
                        }
                    });
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th8) {
                                th.addSuppressed(th8);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
                    Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                    Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
                    assertInsert((SourceRecord) consumeRecordsByTopic3.recordsForTopic(this.DATABASE.topicForTable("products")).get(0), "id", 1001);
                    MySQLConnection forTestDatabase4 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                    Throwable th9 = null;
                    try {
                        JdbcConnection connect3 = forTestDatabase4.connect();
                        Throwable th10 = null;
                        try {
                            connect3.execute(new String[]{"UPDATE products SET id=2001, description='really old robot' WHERE id=1001"});
                            connect3.query("SELECT * FROM products", resultSet4 -> {
                                if (Testing.Print.isEnabled()) {
                                    connect3.print(resultSet4);
                                }
                            });
                            if (connect3 != null) {
                                if (0 != 0) {
                                    try {
                                        connect3.close();
                                    } catch (Throwable th11) {
                                        th10.addSuppressed(th11);
                                    }
                                } else {
                                    connect3.close();
                                }
                            }
                            List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(this.DATABASE.topicForTable("products"));
                            Assertions.assertThat(recordsForTopic.size()).isEqualTo(3);
                            assertDelete((SourceRecord) recordsForTopic.get(0), "id", 1001);
                            assertTombstone((SourceRecord) recordsForTopic.get(1), "id", 1001);
                            assertInsert((SourceRecord) recordsForTopic.get(2), "id", 2001);
                            Testing.print("*** Done with PK change");
                            MySQLConnection forTestDatabase5 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                            Throwable th12 = null;
                            try {
                                connect2 = forTestDatabase5.connect();
                                Throwable th13 = null;
                                try {
                                    try {
                                        connect2.execute(new String[]{"UPDATE products SET weight=1345.67 WHERE id=2001"});
                                        connect2.query("SELECT * FROM products", resultSet5 -> {
                                            if (Testing.Print.isEnabled()) {
                                                connect2.print(resultSet5);
                                            }
                                        });
                                        if (connect2 != null) {
                                            if (0 != 0) {
                                                try {
                                                    connect2.close();
                                                } catch (Throwable th14) {
                                                    th13.addSuppressed(th14);
                                                }
                                            } else {
                                                connect2.close();
                                            }
                                        }
                                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(1);
                                        Assertions.assertThat(consumeRecordsByTopic4.topics().size()).isEqualTo(1);
                                        List recordsForTopic2 = consumeRecordsByTopic4.recordsForTopic(this.DATABASE.topicForTable("products"));
                                        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
                                        assertUpdate((SourceRecord) recordsForTopic2.get(0), "id", 2001);
                                        recordsForTopic2.forEach(this::validate);
                                        Testing.print("*** Done with simple update");
                                        MySQLConnection forTestDatabase6 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                                        Throwable th15 = null;
                                        try {
                                            JdbcConnection connect4 = forTestDatabase6.connect();
                                            Throwable th16 = null;
                                            try {
                                                try {
                                                    connect4.execute(new String[]{String.format("ALTER TABLE %s.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL AFTER description", this.DATABASE.getDatabaseName())});
                                                    connect4.execute(new String[]{"UPDATE products SET volume=13.5 WHERE id=2001"});
                                                    connect4.query("SELECT * FROM products", resultSet6 -> {
                                                        if (Testing.Print.isEnabled()) {
                                                            connect4.print(resultSet6);
                                                        }
                                                    });
                                                    if (connect4 != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                connect4.close();
                                                            } catch (Throwable th17) {
                                                                th16.addSuppressed(th17);
                                                            }
                                                        } else {
                                                            connect4.close();
                                                        }
                                                    }
                                                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
                                                    Assertions.assertThat(consumeRecordsByTopic5.topics().size()).isEqualTo(2);
                                                    Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(1);
                                                    List recordsForTopic3 = consumeRecordsByTopic5.recordsForTopic(this.DATABASE.topicForTable("products"));
                                                    Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
                                                    assertUpdate((SourceRecord) recordsForTopic3.get(0), "id", 2001);
                                                    recordsForTopic3.forEach(this::validate);
                                                    Testing.print("*** Done with schema change (same db and fully-qualified name)");
                                                    forTestDatabase = MySQLConnection.forTestDatabase("emptydb");
                                                    th2 = null;
                                                } finally {
                                                }
                                            } finally {
                                                if (connect4 != null) {
                                                    if (th16 != null) {
                                                        try {
                                                            connect4.close();
                                                        } catch (Throwable th18) {
                                                            th16.addSuppressed(th18);
                                                        }
                                                    } else {
                                                        connect4.close();
                                                    }
                                                }
                                            }
                                        } finally {
                                            if (forTestDatabase6 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        forTestDatabase6.close();
                                                    } catch (Throwable th19) {
                                                        th15.addSuppressed(th19);
                                                    }
                                                } else {
                                                    forTestDatabase6.close();
                                                }
                                            }
                                        }
                                    } finally {
                                    }
                                    try {
                                        JdbcConnection connect5 = forTestDatabase.connect();
                                        Throwable th20 = null;
                                        try {
                                            connect5.execute(new String[]{String.format("CREATE TABLE %s.stores ( id INT(11) PRIMARY KEY NOT NULL AUTO_INCREMENT, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL );", this.DATABASE.getDatabaseName())});
                                            if (connect5 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        connect5.close();
                                                    } catch (Throwable th21) {
                                                        th20.addSuppressed(th21);
                                                    }
                                                } else {
                                                    connect5.close();
                                                }
                                            }
                                            AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(1);
                                            Assertions.assertThat(consumeRecordsByTopic6.topics().size()).isEqualTo(1);
                                            Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(1);
                                            consumeRecordsByTopic6.recordsForTopic(this.DATABASE.getServerName()).forEach(this::validate);
                                            Testing.print("*** Done with PK change (different db and fully-qualified name)");
                                            MySQLConnection forTestDatabase7 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                                            Throwable th22 = null;
                                            try {
                                                JdbcConnection connect6 = forTestDatabase7.connect();
                                                Throwable th23 = null;
                                                try {
                                                    connect6.execute(new String[]{"UPDATE products_on_hand SET quantity=20 WHERE product_id=109"});
                                                    connect6.query("SELECT * FROM products_on_hand", resultSet7 -> {
                                                        if (Testing.Print.isEnabled()) {
                                                            connect6.print(resultSet7);
                                                        }
                                                    });
                                                    if (connect6 != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                connect6.close();
                                                            } catch (Throwable th24) {
                                                                th23.addSuppressed(th24);
                                                            }
                                                        } else {
                                                            connect6.close();
                                                        }
                                                    }
                                                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic7 = consumeRecordsByTopic(1);
                                                    Assertions.assertThat(consumeRecordsByTopic7.topics().size()).isEqualTo(1);
                                                    List recordsForTopic4 = consumeRecordsByTopic7.recordsForTopic(this.DATABASE.topicForTable("products_on_hand"));
                                                    Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
                                                    assertUpdate((SourceRecord) recordsForTopic4.get(0), "product_id", 109);
                                                    recordsForTopic4.forEach(this::validate);
                                                    Testing.print("*** Done with verifying no additional events");
                                                    stopConnector();
                                                    Testing.print("*** Restarting connector");
                                                    EmbeddedEngine.CompletionResult completionResult = new EmbeddedEngine.CompletionResult();
                                                    start(MySqlConnector.class, this.config, completionResult, sourceRecord3 -> {
                                                        return ((Number) ((Struct) sourceRecord3.key()).get("id")).intValue() == 3003;
                                                    });
                                                    BinlogPosition binlogPosition = new BinlogPosition();
                                                    BinlogPosition binlogPosition2 = new BinlogPosition();
                                                    BinlogPosition binlogPosition3 = new BinlogPosition();
                                                    MySQLConnection forTestDatabase8 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                                                    Throwable th25 = null;
                                                    try {
                                                        JdbcConnection connect7 = forTestDatabase8.connect();
                                                        Throwable th26 = null;
                                                        try {
                                                            try {
                                                                binlogPosition.getClass();
                                                                connect7.query("SHOW MASTER STATUS", binlogPosition::readFromDatabase);
                                                                connect7.execute(new String[]{"INSERT INTO products(id,name,description,weight,volume,alias) VALUES (3001,'ashley','super robot',34.56,0.00,'ashbot'), (3002,'arthur','motorcycle',87.65,0.00,'arcycle'), (3003,'oak','tree',987.65,0.00,'oak');"});
                                                                connect7.query("SELECT * FROM products", resultSet8 -> {
                                                                    if (Testing.Print.isEnabled()) {
                                                                        connect7.print(resultSet8);
                                                                    }
                                                                });
                                                                binlogPosition2.getClass();
                                                                connect7.query("SHOW MASTER STATUS", binlogPosition2::readFromDatabase);
                                                                connect7.execute(new String[]{"UPDATE products_on_hand SET quantity=40 WHERE product_id=109"});
                                                                connect7.query("SELECT * FROM products_on_hand", resultSet9 -> {
                                                                    if (Testing.Print.isEnabled()) {
                                                                        connect7.print(resultSet9);
                                                                    }
                                                                });
                                                                binlogPosition3.getClass();
                                                                connect7.query("SHOW MASTER STATUS", binlogPosition3::readFromDatabase);
                                                                if (connect7 != null) {
                                                                    if (0 != 0) {
                                                                        try {
                                                                            connect7.close();
                                                                        } catch (Throwable th27) {
                                                                            th26.addSuppressed(th27);
                                                                        }
                                                                    } else {
                                                                        connect7.close();
                                                                    }
                                                                }
                                                                AbstractConnectorTest.SourceRecords consumeRecordsByTopic8 = consumeRecordsByTopic(2);
                                                                Assertions.assertThat(consumeRecordsByTopic8.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
                                                                Assertions.assertThat(consumeRecordsByTopic8.topics().size()).isEqualTo(1);
                                                                List recordsForTopic5 = consumeRecordsByTopic8.recordsForTopic(this.DATABASE.topicForTable("products"));
                                                                assertInsert((SourceRecord) recordsForTopic5.get(0), "id", 3001);
                                                                assertInsert((SourceRecord) recordsForTopic5.get(1), "id", 3002);
                                                                completionResult.await(10L, TimeUnit.SECONDS);
                                                                Assertions.assertThat(completionResult.hasCompleted()).isTrue();
                                                                Assertions.assertThat(completionResult.hasError()).isTrue();
                                                                Assertions.assertThat(completionResult.success()).isFalse();
                                                                assertNoRecordsToConsume();
                                                                assertConnectorNotRunning();
                                                                stopConnector();
                                                                SourceInfo sourceInfo = new SourceInfo(new MySqlConnectorConfig(Configuration.create().with(MySqlConnectorConfig.SERVER_NAME, this.config.getString(MySqlConnectorConfig.SERVER_NAME)).build()));
                                                                Map readLastCommittedOffset = readLastCommittedOffset(this.config, sourceInfo.partition());
                                                                sourceInfo.setOffset(readLastCommittedOffset);
                                                                Testing.print("Position before inserts: " + binlogPosition);
                                                                Testing.print("Position after inserts:  " + binlogPosition2);
                                                                Testing.print("Offset: " + readLastCommittedOffset);
                                                                Testing.print("Position after update:  " + binlogPosition3);
                                                                if (equals) {
                                                                    Assertions.assertThat(sourceInfo.binlogFilename()).isEqualTo(binlogPosition.binlogFilename());
                                                                    Assertions.assertThat(sourceInfo.binlogFilename()).isEqualTo(binlogPosition2.binlogFilename());
                                                                    Assertions.assertThat(sourceInfo.binlogPosition()).isGreaterThan(binlogPosition.binlogPosition());
                                                                    Assertions.assertThat(sourceInfo.binlogPosition()).isLessThan(binlogPosition2.binlogPosition());
                                                                }
                                                                Assertions.assertThat(sourceInfo.eventsToSkipUponRestart()).isEqualTo(2L);
                                                                Testing.print("*** Restarting connector, and should begin with inserting 3003 (not 109!)");
                                                                start(MySqlConnector.class, this.config);
                                                                AbstractConnectorTest.SourceRecords consumeRecordsByTopic9 = consumeRecordsByTopic(1);
                                                                Assertions.assertThat(consumeRecordsByTopic9.topics().size()).isEqualTo(1);
                                                                List recordsForTopic6 = consumeRecordsByTopic9.recordsForTopic(this.DATABASE.topicForTable("products"));
                                                                if (recordsForTopic6 == null && consumeRecordsByTopic9.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")) != null) {
                                                                    Assert.fail("Restarted connector and missed the insert of product id=3003!");
                                                                }
                                                                SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic6.get(0);
                                                                assertInsert(sourceRecord4, "id", 3003);
                                                                assertOffset(sourceRecord4, "file", readLastCommittedOffset.get("file"));
                                                                assertOffset(sourceRecord4, "pos", readLastCommittedOffset.get("pos"));
                                                                assertOffset(sourceRecord4, "row", 3);
                                                                assertOffset(sourceRecord4, "event", readLastCommittedOffset.get("event"));
                                                                assertValueField(sourceRecord4, "after/id", 3003);
                                                                assertValueField(sourceRecord4, "after/name", "oak");
                                                                assertValueField(sourceRecord4, "after/description", "tree");
                                                                assertValueField(sourceRecord4, "after/weight", Double.valueOf(987.65d));
                                                                assertValueField(sourceRecord4, "after/volume", Double.valueOf(0.0d));
                                                                assertValueField(sourceRecord4, "after/alias", "oak");
                                                                AbstractConnectorTest.SourceRecords consumeRecordsByTopic10 = consumeRecordsByTopic(1);
                                                                Assertions.assertThat(consumeRecordsByTopic10.topics().size()).isEqualTo(1);
                                                                List recordsForTopic7 = consumeRecordsByTopic10.recordsForTopic(this.DATABASE.topicForTable("products_on_hand"));
                                                                Assertions.assertThat(recordsForTopic7.size()).isEqualTo(1);
                                                                assertUpdate((SourceRecord) recordsForTopic7.get(0), "product_id", 109);
                                                                recordsForTopic7.forEach(this::validate);
                                                                Testing.print("*** Done with simple insert");
                                                            } finally {
                                                            }
                                                        } finally {
                                                            if (connect7 != null) {
                                                                if (th26 != null) {
                                                                    try {
                                                                        connect7.close();
                                                                    } catch (Throwable th28) {
                                                                        th26.addSuppressed(th28);
                                                                    }
                                                                } else {
                                                                    connect7.close();
                                                                }
                                                            }
                                                        }
                                                    } finally {
                                                        if (forTestDatabase8 != null) {
                                                            if (0 != 0) {
                                                                try {
                                                                    forTestDatabase8.close();
                                                                } catch (Throwable th29) {
                                                                    th25.addSuppressed(th29);
                                                                }
                                                            } else {
                                                                forTestDatabase8.close();
                                                            }
                                                        }
                                                    }
                                                } catch (Throwable th30) {
                                                    if (connect6 != null) {
                                                        if (0 != 0) {
                                                            try {
                                                                connect6.close();
                                                            } catch (Throwable th31) {
                                                                th23.addSuppressed(th31);
                                                            }
                                                        } else {
                                                            connect6.close();
                                                        }
                                                    }
                                                    throw th30;
                                                }
                                            } finally {
                                                if (forTestDatabase7 != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            forTestDatabase7.close();
                                                        } catch (Throwable th32) {
                                                            th22.addSuppressed(th32);
                                                        }
                                                    } else {
                                                        forTestDatabase7.close();
                                                    }
                                                }
                                            }
                                        } catch (Throwable th33) {
                                            if (connect5 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        connect5.close();
                                                    } catch (Throwable th34) {
                                                        th20.addSuppressed(th34);
                                                    }
                                                } else {
                                                    connect5.close();
                                                }
                                            }
                                            throw th33;
                                        }
                                    } finally {
                                        if (forTestDatabase != null) {
                                            if (0 != 0) {
                                                try {
                                                    forTestDatabase.close();
                                                } catch (Throwable th35) {
                                                    th2.addSuppressed(th35);
                                                }
                                            } else {
                                                forTestDatabase.close();
                                            }
                                        }
                                    }
                                } finally {
                                }
                            } finally {
                                if (forTestDatabase5 != null) {
                                    if (0 != 0) {
                                        try {
                                            forTestDatabase5.close();
                                        } catch (Throwable th36) {
                                            th12.addSuppressed(th36);
                                        }
                                    } else {
                                        forTestDatabase5.close();
                                    }
                                }
                            }
                        } catch (Throwable th37) {
                            if (connect3 != null) {
                                if (0 != 0) {
                                    try {
                                        connect3.close();
                                    } catch (Throwable th38) {
                                        th10.addSuppressed(th38);
                                    }
                                } else {
                                    connect3.close();
                                }
                            }
                            throw th37;
                        }
                    } finally {
                        if (forTestDatabase4 != null) {
                            if (0 != 0) {
                                try {
                                    forTestDatabase4.close();
                                } catch (Throwable th39) {
                                    th9.addSuppressed(th39);
                                }
                            } else {
                                forTestDatabase4.close();
                            }
                        }
                    }
                } catch (Throwable th40) {
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th41) {
                                th.addSuppressed(th41);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    throw th40;
                }
            } finally {
            }
        } finally {
            if (forTestDatabase2 != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase2.close();
                    } catch (Throwable th42) {
                        th3.addSuppressed(th42);
                    }
                } else {
                    forTestDatabase2.close();
                }
            }
        }
    }

    @Test
    public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLException, InterruptedException {
        if (!System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"))) {
            Thread.sleep(5000L);
        }
        this.config = Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SERVER_ID, 28765).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName()).with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_WHITELIST, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.getDatabaseName() + ".products").with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, this.DATABASE.getDatabaseName() + ".products").with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + this.DATABASE.getDatabaseName() + ".products", String.format("SELECT * from %s.products where id>=108 order by id", this.DATABASE.getDatabaseName())).with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(8);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0)).key()).getInt32("id")).isEqualTo(108);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(1)).key()).getInt32("id")).isEqualTo(109);
        consumeRecordsByTopic.forEach(this::validate);
    }

    @Test
    public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() throws SQLException, InterruptedException {
        if (!System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"))) {
            Thread.sleep(5000L);
        }
        String format = String.format("%s.products,%s.products_on_hand", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")).with(MySqlConnectorConfig.USER, "snapper").with(MySqlConnectorConfig.PASSWORD, "snapperpass").with(MySqlConnectorConfig.SERVER_ID, 28765).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName()).with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.DATABASE_WHITELIST, this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.TABLE_WHITELIST, format).with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, format).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + this.DATABASE.getDatabaseName() + ".products", String.format("SELECT * from %s.products where id>=108 order by id", this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + this.DATABASE.getDatabaseName() + ".products_on_hand", String.format("SELECT * from %s.products_on_hand where product_id>=108 order by product_id", this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(12);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(8);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(2);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0)).key()).getInt32("id")).isEqualTo(108);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(1)).key()).getInt32("id")).isEqualTo(109);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).get(0)).key()).getInt32("product_id")).isEqualTo(108);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).get(1)).key()).getInt32("product_id")).isEqualTo(109);
        consumeRecordsByTopic.forEach(this::validate);
    }

    @Test
    @FixFor({"DBZ-977"})
    public void shouldIgnoreAlterTableForNonCapturedTablesNotStoredInHistory() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_WHITELIST, String.format("%s.customers", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(6).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"ALTER TABLE orders ADD COLUMN (newcol INT)"});
                    connect.execute(new String[]{"ALTER TABLE customers ADD COLUMN (newcol INT)"});
                    connect.execute(new String[]{"INSERT INTO customers VALUES (default,'name','surname','email',1);"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("customers")).size()).isEqualTo(1);
                    Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(1);
                    stopConnector();
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-1201"})
    public void shouldSaveSetCharacterSetWhenStoringOnlyMonitoredTables() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.DATABASE_WHITELIST, "no_" + this.DATABASE.getDatabaseName()).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(1).ddlRecordsForDatabase("").size()).isEqualTo(1);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1246"})
    public void shouldProcessCreateUniqueIndex() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_WHITELIST, String.format("%s.migration_test", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        start(MySqlConnector.class, this.config);
        waitForStreamingRunning(this.DATABASE.getServerName());
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"create table migration_test (id varchar(20) null,mgb_no varchar(20) null)", "create unique index migration_test_mgb_no_uindex on migration_test (mgb_no)", "insert into migration_test values(1,'2')"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(16);
                    List recordsForTopic = consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("migration_test"));
                    Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
                    Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).key()).getString("mgb_no")).isEqualTo("2");
                    Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(13);
                    stopConnector();
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-977"})
    public void shouldIgnoreAlterTableForNonCapturedTablesStoredInHistory() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_WHITELIST, String.format("%s.customers", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        dropDatabases();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(12).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"ALTER TABLE orders ADD COLUMN (newcol INT)"});
                    connect.execute(new String[]{"ALTER TABLE customers ADD COLUMN (newcol INT)"});
                    connect.execute(new String[]{"INSERT INTO customers VALUES (default,'name','surname','email',1);"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(3);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("customers")).size()).isEqualTo(1);
                    Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(2);
                    stopConnector();
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-1264"})
    public void shouldIgnoreCreateIndexForNonCapturedTablesNotStoredInHistory() throws SQLException, InterruptedException {
        MySQLConnection forTestDatabase;
        Throwable th;
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_WHITELIST, String.format("%s.customers", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true).build();
        MySQLConnection forTestDatabase2 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th2 = null;
        try {
            JdbcConnection connect = forTestDatabase2.connect();
            Throwable th3 = null;
            try {
                try {
                    connect.execute(new String[]{"CREATE TABLE nonmon (id INT)"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    start(MySqlConnector.class, this.config);
                    Assertions.assertThat(consumeRecordsByTopic(6).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
                    forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                    th = null;
                } finally {
                }
                try {
                    connect = forTestDatabase.connect();
                    Throwable th5 = null;
                    try {
                        try {
                            connect.execute(new String[]{"CREATE UNIQUE INDEX pk ON nonmon(id)", "INSERT INTO customers VALUES (default,'name','surname','email');"});
                            if (connect != null) {
                                if (0 != 0) {
                                    try {
                                        connect.close();
                                    } catch (Throwable th6) {
                                        th5.addSuppressed(th6);
                                    }
                                } else {
                                    connect.close();
                                }
                            }
                            Assertions.assertThat(consumeRecord().topic()).isEqualTo(this.DATABASE.topicForTable("customers"));
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    if (forTestDatabase != null) {
                        if (0 != 0) {
                            try {
                                forTestDatabase.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        } else {
                            forTestDatabase.close();
                        }
                    }
                }
            } finally {
            }
        } finally {
            if (forTestDatabase2 != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase2.close();
                    } catch (Throwable th8) {
                        th2.addSuppressed(th8);
                    }
                } else {
                    forTestDatabase2.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-683"})
    public void shouldReceiveSchemaForNonWhitelistedTablesAndDatabases() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_WHITELIST, String.format("%s.customers,%s.orders", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.DATABASE_WHITELIST, ".*").build();
        dropDatabases();
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase("mysql");
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                connect.execute(new String[]{"CREATE DATABASE non_wh", "USE non_wh", "CREATE TABLE t1 (ID INT PRIMARY KEY)"});
                if (connect != null) {
                    if (0 != 0) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                start(MySqlConnector.class, this.config);
                Assertions.assertThat(consumeRecordsByTopic(17).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
                stopConnector();
            } catch (Throwable th4) {
                if (connect != null) {
                    if (0 != 0) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-1546"})
    public void shouldHandleWhitelistedTables() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_WHITELIST, String.format("%s.customers, %s.orders", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_WHITELIST, ".*").build();
        dropDatabases();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(17).ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        stopConnector();
    }

    private void dropDatabases() throws SQLException {
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase("mysql");
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.query("SHOW DATABASES", resultSet -> {
                        while (resultSet.next()) {
                            String string = resultSet.getString(1);
                            if (!Filters.isBuiltInDatabase(string) && !string.equals(this.DATABASE.getDatabaseName())) {
                                connect.execute(new String[]{"DROP DATABASE IF EXISTS " + string});
                            }
                        }
                    });
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    if (forTestDatabase != null) {
                        if (0 == 0) {
                            forTestDatabase.close();
                            return;
                        }
                        try {
                            forTestDatabase.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
            throw th8;
        }
    }

    private Struct getAfter(SourceRecord sourceRecord) {
        return (Struct) ((Struct) sourceRecord.value()).get("after");
    }

    @Test
    public void shouldConsumeEventsWithNoSnapshot() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.RO_DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(INITIAL_EVENT_COUNT);
        Assertions.assertThat(recordsForTopicForRoProductsTable(consumeRecordsByTopic).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("Products")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.RO_DATABASE.getDatabaseName()).size()).isEqualTo(6);
        Optional findFirst = consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("Products")).stream().filter(sourceRecord -> {
            return "hammer2".equals(getAfter(sourceRecord).get("name"));
        }).findFirst();
        Assertions.assertThat(findFirst.isPresent());
        Assertions.assertThat(getAfter((SourceRecord) findFirst.get()).get("weight")).isEqualTo(Double.valueOf(0.875d));
        consumeRecordsByTopic.forEach(this::validate);
        stopConnector();
        consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).forEach(sourceRecord2 -> {
            print(sourceRecord2);
        });
        consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).forEach(sourceRecord3 -> {
            print(sourceRecord3);
        });
    }

    @Test
    public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLException, InterruptedException {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.RO_DATABASE.defaultConfig().with(MySqlConnectorConfig.COLUMN_BLACKLIST, this.RO_DATABASE.qualifiedTableName("orders") + ".order_number").with(MySqlConnectorConfig.MASK_COLUMN(12), this.RO_DATABASE.qualifiedTableName("customers") + ".email").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(28);
        Assertions.assertThat(recordsForTopicForRoProductsTable(consumeRecordsByTopic).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(PRODUCTS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        consumeRecordsByTopic.forEach(this::validate);
        stopConnector();
        consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).forEach(sourceRecord -> {
            print(sourceRecord);
            try {
                ((Struct) sourceRecord.value()).get("order_number");
                Assert.fail("The 'order_number' field was found but should not exist");
            } catch (DataException e) {
            }
        });
        consumeRecordsByTopic.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).forEach(sourceRecord2 -> {
            Struct struct = (Struct) sourceRecord2.value();
            if (struct.getStruct("after") != null) {
                Assertions.assertThat(struct.getStruct("after").getString("email")).isEqualTo("************");
            }
            if (struct.getStruct("before") != null) {
                Assertions.assertThat(struct.getStruct("before").getString("email")).isEqualTo("************");
            }
            print(sourceRecord2);
        });
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldEmitTombstoneOnDeleteByDefault() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"UPDATE orders SET order_number=10101 WHERE order_number=10001"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(this.DATABASE.topicForTable("orders"));
                    Assertions.assertThat(recordsForTopic.size()).isEqualTo(3);
                    assertDelete((SourceRecord) recordsForTopic.get(0), "order_number", 10001);
                    assertTombstone((SourceRecord) recordsForTopic.get(1), "order_number", 10001);
                    assertInsert((SourceRecord) recordsForTopic.get(2), "order_number", 10101);
                    MySQLConnection forTestDatabase2 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                    Throwable th4 = null;
                    try {
                        JdbcConnection connect2 = forTestDatabase2.connect();
                        Throwable th5 = null;
                        try {
                            connect2.execute(new String[]{"DELETE FROM orders WHERE order_number=10101"});
                            if (connect2 != null) {
                                if (0 != 0) {
                                    try {
                                        connect2.close();
                                    } catch (Throwable th6) {
                                        th5.addSuppressed(th6);
                                    }
                                } else {
                                    connect2.close();
                                }
                            }
                            List recordsForTopic2 = consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("orders"));
                            Assertions.assertThat(recordsForTopic2.size()).isEqualTo(2);
                            assertDelete((SourceRecord) recordsForTopic2.get(0), "order_number", 10101);
                            assertTombstone((SourceRecord) recordsForTopic2.get(1), "order_number", 10101);
                            stopConnector();
                        } catch (Throwable th7) {
                            if (connect2 != null) {
                                if (0 != 0) {
                                    try {
                                        connect2.close();
                                    } catch (Throwable th8) {
                                        th5.addSuppressed(th8);
                                    }
                                } else {
                                    connect2.close();
                                }
                            }
                            throw th7;
                        }
                    } finally {
                        if (forTestDatabase2 != null) {
                            if (0 != 0) {
                                try {
                                    forTestDatabase2.close();
                                } catch (Throwable th9) {
                                    th4.addSuppressed(th9);
                                }
                            } else {
                                forTestDatabase2.close();
                            }
                        }
                    }
                } finally {
                }
            } catch (Throwable th10) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th11) {
                            th2.addSuppressed(th11);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th10;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th12) {
                        th.addSuppressed(th12);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldEmitNoTombstoneOnDelete() throws Exception {
        MySQLConnection forTestDatabase;
        Throwable th;
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        MySQLConnection forTestDatabase2 = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th2 = null;
        try {
            JdbcConnection connect = forTestDatabase2.connect();
            Throwable th3 = null;
            try {
                try {
                    connect.execute(new String[]{"UPDATE orders SET order_number=10101 WHERE order_number=10001"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("orders"));
                    Assertions.assertThat(recordsForTopic.size()).isEqualTo(2);
                    assertDelete((SourceRecord) recordsForTopic.get(0), "order_number", 10001);
                    assertInsert((SourceRecord) recordsForTopic.get(1), "order_number", 10101);
                    forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                    th = null;
                } finally {
                }
                try {
                    connect = forTestDatabase.connect();
                    Throwable th5 = null;
                    try {
                        try {
                            connect.execute(new String[]{"DELETE FROM orders WHERE order_number = 10101;"});
                            connect.execute(new String[]{"DELETE FROM orders WHERE order_number = 10002;"});
                            if (connect != null) {
                                if (0 != 0) {
                                    try {
                                        connect.close();
                                    } catch (Throwable th6) {
                                        th5.addSuppressed(th6);
                                    }
                                } else {
                                    connect.close();
                                }
                            }
                            List recordsForTopic2 = consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("orders"));
                            Assertions.assertThat(recordsForTopic2.size()).isEqualTo(2);
                            assertDelete((SourceRecord) recordsForTopic2.get(0), "order_number", 10101);
                            assertDelete((SourceRecord) recordsForTopic2.get(1), "order_number", 10002);
                            stopConnector();
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    if (forTestDatabase != null) {
                        if (0 != 0) {
                            try {
                                forTestDatabase.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        } else {
                            forTestDatabase.close();
                        }
                    }
                }
            } finally {
            }
        } finally {
            if (forTestDatabase2 != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase2.close();
                    } catch (Throwable th8) {
                        th2.addSuppressed(th8);
                    }
                } else {
                    forTestDatabase2.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-794"})
    public void shouldEmitNoSavepoints() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build();
        start(MySqlConnector.class, this.config);
        Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(ORDERS_TABLE_EVENT_COUNT);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    Connection connection = connect.connection();
                    connect.setAutoCommit(false);
                    Statement createStatement = connection.createStatement();
                    createStatement.executeUpdate("DELETE FROM orders WHERE order_number = 10001");
                    createStatement.executeUpdate("SavePoint sp2");
                    createStatement.executeUpdate("DELETE FROM orders WHERE order_number = 10002");
                    connection.commit();
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                    Assertions.assertThat(consumeRecordsByTopic.ddlRecordsForDatabase(this.DATABASE.getDatabaseName())).isNullOrEmpty();
                    List recordsForTopic = consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders"));
                    assertDelete((SourceRecord) recordsForTopic.get(0), "order_number", 10001);
                    assertDelete((SourceRecord) recordsForTopic.get(1), "order_number", 10002);
                    stopConnector();
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void shouldNotParseQueryIfServerOptionDisabled() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"SET binlog_rows_query_log_events=OFF"});
                    connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                    validate(sourceRecord);
                    assertInsert(sourceRecord, "id", 110);
                    assertHasNoSourceQuery(sourceRecord);
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void shouldNotParseQueryIfConnectorNotConfiguredTo() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, false).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                    connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                    this.logger.info("Record: {}", sourceRecord);
                    validate(sourceRecord);
                    assertInsert(sourceRecord, "id", 110);
                    assertHasNoSourceQuery(sourceRecord);
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                    connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                    this.logger.info("Record: {}", sourceRecord);
                    validate(sourceRecord);
                    assertInsert(sourceRecord, "id", 110);
                    assertSourceQuery(sourceRecord, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)");
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseMultipleInsertStatements() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        this.logger.warn(this.DATABASE.getDatabaseName());
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                    connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
                    connect.execute(new String[]{"INSERT INTO products VALUES (default,'toaster','Toaster',3.33)"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                    validate(sourceRecord);
                    assertInsert(sourceRecord, "id", 110);
                    assertSourceQuery(sourceRecord, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)");
                    SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(1);
                    validate(sourceRecord2);
                    assertInsert(sourceRecord2, "id", 111);
                    assertSourceQuery(sourceRecord2, "INSERT INTO products VALUES (default,'toaster','Toaster',3.33)");
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseMultipleRowInsertStatement() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        this.logger.warn(this.DATABASE.getDatabaseName());
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                    connect.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                    validate(sourceRecord);
                    assertInsert(sourceRecord, "id", 110);
                    assertSourceQuery(sourceRecord, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)");
                    SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(1);
                    validate(sourceRecord2);
                    assertInsert(sourceRecord2, "id", 111);
                    assertSourceQuery(sourceRecord2, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)");
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseDeleteQuery() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("orders")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(ORDERS_TABLE_EVENT_COUNT, null);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                    connect.execute(new String[]{"DELETE FROM orders WHERE order_number=10001 LIMIT 1"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(1);
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(0);
                    validate(sourceRecord);
                    assertDelete(sourceRecord, "order_number", 10001);
                    assertSourceQuery(sourceRecord, "DELETE FROM orders WHERE order_number=10001 LIMIT 1");
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseMultiRowDeleteQuery() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("orders")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(ORDERS_TABLE_EVENT_COUNT, null);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                    connect.execute(new String[]{"DELETE FROM orders WHERE purchaser=1002"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(2);
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(0);
                    validate(sourceRecord);
                    assertDelete(sourceRecord, "order_number", 10002);
                    assertSourceQuery(sourceRecord, "DELETE FROM orders WHERE purchaser=1002");
                    SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(1);
                    validate(sourceRecord2);
                    assertDelete(sourceRecord2, "order_number", 10004);
                    assertSourceQuery(sourceRecord2, "DELETE FROM orders WHERE purchaser=1002");
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseUpdateQuery() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                    connect.execute(new String[]{"UPDATE products set name='toaster' where id=109 LIMIT 1"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
                    validate(sourceRecord);
                    assertUpdate(sourceRecord, "id", 109);
                    assertSourceQuery(sourceRecord, "UPDATE products set name='toaster' where id=109 LIMIT 1");
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-706"})
    public void parseMultiRowUpdateQuery() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("orders")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).build();
        start(MySqlConnector.class, this.config);
        consumeRecords(ORDERS_TABLE_EVENT_COUNT, null);
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                    connect.execute(new String[]{"UPDATE orders set quantity=0 where order_number in (10001, 10004)"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(2);
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(0);
                    validate(sourceRecord);
                    assertUpdate(sourceRecord, "order_number", 10001);
                    assertSourceQuery(sourceRecord, "UPDATE orders set quantity=0 where order_number in (10001, 10004)");
                    SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("orders")).get(1);
                    validate(sourceRecord2);
                    assertUpdate(sourceRecord2, "order_number", 10004);
                    assertSourceQuery(sourceRecord2, "UPDATE orders set quantity=0 where order_number in (10001, 10004)");
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-1234"})
    public void shouldFailToValidateAdaptivePrecisionMode() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE).build();
        assertConfigurationErrors(new MySqlConnector().validate(this.config.asMap()), MySqlConnectorConfig.TIME_PRECISION_MODE);
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testEmptySchemaLogWarningWithDatabaseWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.DATABASE_WHITELIST, "my_database").build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(12);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testNoEmptySchemaLogWarningWithDatabaseWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(12);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration")).isFalse();
        });
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testEmptySchemaWarningWithTableWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("my_products")).build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        consumeRecordsByTopic(12);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testNoEmptySchemaWarningWithTableWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        consumeRecordsByTopic(12);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration")).isFalse();
        });
    }

    @Test
    @FixFor({"DBZ-1015"})
    public void shouldRewriteIdentityKey() throws InterruptedException, SQLException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("products")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true).with(MySqlConnectorConfig.MSG_KEY_COLUMNS, "(.*).products:id,name").build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(PRODUCTS_TABLE_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("products")).forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.key();
            Assertions.assertThat(struct.get("id")).isNotNull();
            Assertions.assertThat(struct.get("name")).isNotNull();
        });
    }

    @Test
    @FixFor({"DBZ-1292"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        Testing.Files.delete(DB_HISTORY_PATH);
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(MySqlConnectorConfig.TABLE_WHITELIST, this.DATABASE.qualifiedTableName("products")).build();
        start(MySqlConnector.class, this.config);
        for (SourceRecord sourceRecord : consumeRecordsByTopic(PRODUCTS_TABLE_EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("products"))) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "mysql", "myServer1", false);
        }
    }

    private void waitForStreamingRunning(String str) throws InterruptedException {
        waitForStreamingRunning("mysql", str, "binlog");
    }

    private List<SourceRecord> recordsForTopicForRoProductsTable(AbstractConnectorTest.SourceRecords sourceRecords) {
        List<SourceRecord> recordsForTopic = sourceRecords.recordsForTopic(this.RO_DATABASE.topicForTable("Products"));
        return recordsForTopic != null ? recordsForTopic : sourceRecords.recordsForTopic(this.RO_DATABASE.topicForTable("products"));
    }
}
