package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.schema.DefaultRegexTopicNamingStrategy;
import io.debezium.schema.DefaultUnicodeTopicNamingStrategy;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlTopicNamingStrategyIT.class */
public class MySqlTopicNamingStrategyIT extends AbstractAsyncEngineConnectorTest {
    private static final String TABLE_NAME = "dbz4180";
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-comment.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("topic_strategy", "strategy_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    private Configuration config;

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4180"})
    public void testSpecifyDelimiterAndPrefixStrategy() throws SQLException, InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName(TABLE_NAME)).with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "true").with(CommonConnectorConfig.TOPIC_PREFIX, "my_prefix").with(AbstractTopicNamingStrategy.TOPIC_DELIMITER, "_").build();
        start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(100);
        String join = String.join("_", "my_prefix", this.DATABASE.getDatabaseName(), TABLE_NAME);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(join).size()).isEqualTo(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("my_prefix").size()).isEqualTo(12);
        Connection connection = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName()).connection();
        try {
            connection.createStatement().execute("INSERT INTO dbz4180(a, b, c, d) VALUE (10.1, 10.2, 'strategy 1', 1290)");
            if (connection != null) {
                connection.close();
            }
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(join);
            Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after").getString("c")).isEqualTo("strategy 1");
            stopConnector();
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4180"})
    public void testSpecifyByLogicalTableStrategy() throws SQLException, InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("dbz_4180_00") + "," + this.DATABASE.qualifiedTableName("dbz_4180_01")).with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false").with(DefaultRegexTopicNamingStrategy.TOPIC_REGEX, "(.*)(dbz_4180)(.*)").with(DefaultRegexTopicNamingStrategy.TOPIC_REPLACEMENT, "$1$2_all_shards").with(DefaultRegexTopicNamingStrategy.TOPIC_KEY_FIELD_NAME, "origin_table_name").with(DefaultRegexTopicNamingStrategy.TOPIC_KEY_FIELD_REGEX, "(.*)").with(DefaultRegexTopicNamingStrategy.TOPIC_KEY_FIELD_REPLACEMENT, "it_$1").with(CommonConnectorConfig.TOPIC_NAMING_STRATEGY, "io.debezium.schema.DefaultRegexTopicNamingStrategy").build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.execute(new String[]{"INSERT INTO dbz_4180_00(a, b, c, d) VALUE (10.1, 10.2, 'shard 0', 10);", "INSERT INTO dbz_4180_01(a, b, c, d) VALUE (10.1, 10.2, 'shard 1', 11);"});
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(100).recordsForTopic(this.DATABASE.topicForTable("dbz_4180_all_shards")).get(0);
                Assertions.assertThat(sourceRecord.keySchema().field("origin_table_name")).isNotNull();
                Assertions.assertThat(((Struct) sourceRecord.key()).get("origin_table_name").toString().startsWith("it_")).isTrue();
                Assert.assertEquals(2L, r0.size());
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4180"})
    public void testSpecifyTransactionStrategy() throws SQLException, InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName(TABLE_NAME)).with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false").with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, "true").with(AbstractTopicNamingStrategy.TOPIC_TRANSACTION, "my_transaction").build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        MySqlTestConnection forTestDatabase = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = forTestDatabase.connect();
            try {
                connect.setAutoCommit(false);
                connect.execute(new String[]{"INSERT INTO dbz4180(a, b, c, d) VALUE (10.1, 10.2, 'test transaction', 1290)"});
                connect.commit();
                if (connect != null) {
                    connect.close();
                }
                if (forTestDatabase != null) {
                    forTestDatabase.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(100);
                Assert.assertEquals(2L, consumeRecordsByTopic.recordsForTopic(this.DATABASE.getServerName() + ".my_transaction").size());
                Assert.assertEquals(3L, consumeRecordsByTopic.allRecordsInOrder().size());
                stopConnector();
            } finally {
            }
        } catch (Throwable th) {
            if (forTestDatabase != null) {
                try {
                    forTestDatabase.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5743"})
    public void testUnicodeTopicNamingStrategy() throws SQLException, InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("dbz5743中文")).with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "true").with(CommonConnectorConfig.TOPIC_NAMING_STRATEGY, "io.debezium.schema.DefaultUnicodeTopicNamingStrategy").build();
        start(MySqlConnector.class, this.config);
        assertConnectorIsRunning();
        Assert.assertEquals(1L, consumeRecordsByTopic(100).recordsForTopic(new DefaultUnicodeTopicNamingStrategy(this.config.asProperties()).dataChangeTopic(TableId.parse(String.join(".", this.DATABASE.getDatabaseName(), "dbz5743中文")))).size());
        stopConnector();
    }
}
