package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.SkipWhenKafkaVersion;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
/* loaded from: input_file:io/debezium/connector/mysql/MysqlDefaultValueIT.class */
public class MysqlDefaultValueIT extends AbstractConnectorTest {
    private static final int EVENT_COUNT = 49;
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "default_value").withDbHistoryPath(DB_HISTORY_PATH);
    private Configuration config;

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(DB_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(DB_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void unsignedTinyIntTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("UNSIGNED_TINYINT_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Assertions.assertThat(schema.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema.defaultValue()).isEqualTo((short) 0);
        Assertions.assertThat(schema2.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo((short) 10);
        Assertions.assertThat(schema3.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema3.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema4.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.defaultValue()).isEqualTo((short) 0);
        Assertions.assertThat(schema6.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema6.defaultValue()).isEqualTo((short) 0);
        assertEmptyFieldValue(sourceRecord, "G");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void unsignedSmallIntTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("UNSIGNED_SMALLINT_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Assertions.assertThat(schema.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema.defaultValue()).isEqualTo(0);
        Assertions.assertThat(schema2.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(10);
        Assertions.assertThat(schema3.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema3.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema4.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.defaultValue()).isEqualTo(0);
        Assertions.assertThat(schema6.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema6.defaultValue()).isEqualTo(0);
        assertEmptyFieldValue(sourceRecord, "G");
    }

    private void assertEmptyFieldValue(SourceRecord sourceRecord, String str) {
        Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("after")).getWithoutDefault(str)).isNull();
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void unsignedMediumIntTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("UNSIGNED_MEDIUMINT_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Assertions.assertThat(schema.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema.defaultValue()).isEqualTo(0);
        Assertions.assertThat(schema2.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(10);
        Assertions.assertThat(schema3.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema3.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema4.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.defaultValue()).isEqualTo(0);
        Assertions.assertThat(schema6.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema6.defaultValue()).isEqualTo(0);
        assertEmptyFieldValue(sourceRecord, "G");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void unsignedIntTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("UNSIGNED_INT_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Assertions.assertThat(schema.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema.defaultValue()).isEqualTo(0L);
        Assertions.assertThat(schema2.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(10L);
        Assertions.assertThat(schema3.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema3.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema4.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.defaultValue()).isEqualTo(0L);
        Assertions.assertThat(schema6.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema6.defaultValue()).isEqualTo(0L);
        assertEmptyFieldValue(sourceRecord, "G");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void unsignedBigIntToLongTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("UNSIGNED_BIGINT_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Assertions.assertThat(schema.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema.defaultValue()).isEqualTo(0L);
        Assertions.assertThat(schema2.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(10L);
        Assertions.assertThat(schema3.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema3.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema4.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.defaultValue()).isEqualTo(0L);
        Assertions.assertThat(schema6.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema6.defaultValue()).isEqualTo(0L);
        assertEmptyFieldValue(sourceRecord, "G");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void unsignedBigIntToBigDecimalTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE, JdbcValueConverters.BigIntUnsignedMode.PRECISE).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("UNSIGNED_BIGINT_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Assertions.assertThat(schema.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema.defaultValue()).isEqualTo(BigDecimal.ZERO);
        Assertions.assertThat(schema2.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(new BigDecimal(10));
        Assertions.assertThat(schema3.isOptional()).isEqualTo(true);
        Assertions.assertThat(schema3.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema4.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema5.defaultValue()).isEqualTo(BigDecimal.ZERO);
        Assertions.assertThat(schema6.isOptional()).isEqualTo(false);
        Assertions.assertThat(schema6.defaultValue()).isEqualTo(BigDecimal.ZERO);
        assertEmptyFieldValue(sourceRecord, "G");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void stringTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("STRING_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Schema schema7 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(6)).schema();
        Schema schema8 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(7)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo("A");
        Assertions.assertThat(schema2.defaultValue()).isEqualTo("b");
        Assertions.assertThat(schema3.defaultValue()).isEqualTo("CC");
        Assertions.assertThat(schema4.defaultValue()).isEqualTo("10");
        Assertions.assertThat(schema5.defaultValue()).isEqualTo("0");
        Assertions.assertThat(schema6.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema7.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema8.defaultValue()).isEqualTo((Object) null);
        assertEmptyFieldValue(sourceRecord, "I");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void unsignedBitTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("BIT_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Schema schema7 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(6)).schema();
        Schema schema8 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(7)).schema();
        Schema schema9 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(8)).schema();
        Schema schema10 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(9)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(false);
        Assertions.assertThat(schema3.defaultValue()).isEqualTo(true);
        Assertions.assertThat(schema4.defaultValue()).isEqualTo(false);
        Assertions.assertThat(schema5.defaultValue()).isEqualTo(true);
        Assertions.assertThat(schema6.defaultValue()).isEqualTo(true);
        Assertions.assertThat(schema7.defaultValue()).isEqualTo(false);
        Assertions.assertThat(schema8.defaultValue()).isEqualTo(new byte[]{66, 1});
        Assertions.assertThat(schema9.defaultValue()).isEqualTo((Object) null);
        Assertions.assertThat(schema10.defaultValue()).isEqualTo(new byte[]{15, 97, 1, 0});
        assertEmptyFieldValue(sourceRecord, "K");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void booleanTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("BOOLEAN_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo((short) 0);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo((short) 1);
        Assertions.assertThat(schema3.defaultValue()).isEqualTo((short) 1);
        Assertions.assertThat(schema4.defaultValue()).isEqualTo((short) 1);
        Assertions.assertThat(schema5.defaultValue()).isEqualTo((Object) null);
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void numberTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("NUMBER_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(6)).schema();
        Schema schema7 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(7)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo((short) 10);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo((short) 5);
        Assertions.assertThat(schema3.defaultValue()).isEqualTo(0);
        Assertions.assertThat(schema4.defaultValue()).isEqualTo(20L);
        Assertions.assertThat(schema5.defaultValue()).isEqualTo((Object) null);
        assertEmptyFieldValue(sourceRecord, "F");
        Assertions.assertThat(schema6.defaultValue()).isEqualTo((short) 1);
        Assertions.assertThat(schema7.defaultValue()).isEqualTo(1);
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void tinyIntBooleanTest() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        consumeRecordsByTopic(EVENT_COUNT);
        Connection connection = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName()).connection();
        Throwable th = null;
        try {
            try {
                connection.createStatement().execute("CREATE TABLE ti_boolean_table (  A TINYINT(1) NOT NULL DEFAULT TRUE,  B TINYINT(2) NOT NULL DEFAULT FALSE)");
                connection.createStatement().execute("INSERT INTO ti_boolean_table VALUES (default, default)");
                if (connection != null) {
                    if (0 != 0) {
                        try {
                            connection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        connection.close();
                    }
                }
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("ti_boolean_table")).get(0);
                validate(sourceRecord);
                Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
                Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
                Assertions.assertThat(schema.defaultValue()).isEqualTo((short) 1);
                Assertions.assertThat(schema2.defaultValue()).isEqualTo((short) 0);
            } finally {
            }
        } catch (Throwable th3) {
            if (connection != null) {
                if (th != null) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    @FixFor({"DBZ-1689"})
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void intBooleanTest() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        Testing.Print.enable();
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        consumeRecordsByTopic(EVENT_COUNT);
        Connection connection = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName()).connection();
        Throwable th = null;
        try {
            connection.createStatement().execute("CREATE TABLE int_boolean_table (  A INT(1) NOT NULL DEFAULT TRUE,  B INT(2) NOT NULL DEFAULT FALSE)");
            connection.createStatement().execute("INSERT INTO int_boolean_table VALUES (default, default)");
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    connection.close();
                }
            }
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("int_boolean_table")).get(0);
            validate(sourceRecord);
            Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
            Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
            Assertions.assertThat(schema.defaultValue()).isEqualTo(1);
            Assertions.assertThat(schema2.defaultValue()).isEqualTo(0);
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void floatAndDoubleTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("FlOAT_DOUBLE_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo(Double.valueOf(0.0d));
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(Double.valueOf(1.0d));
        assertEmptyFieldValue(sourceRecord, "H");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void realTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("REAL_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo(Double.valueOf(1.0d));
        Assertions.assertThat(schema2.defaultValue()).isEqualTo((Object) null);
        assertEmptyFieldValue(sourceRecord, "C");
    }

    @Test
    public void numericAndDecimalToDoubleTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("NUMERIC_DECIMAL_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo(Double.valueOf(1.23d));
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(Double.valueOf(2.321d));
        Assertions.assertThat(schema3.defaultValue()).isEqualTo(Double.valueOf(12.678d));
        assertEmptyFieldValue(sourceRecord, "D");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void numericAndDecimalToDecimalTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.PRECISE).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("NUMERIC_DECIMAL_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo(BigDecimal.valueOf(1.23d));
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(BigDecimal.valueOf(2.321d));
        assertEmptyFieldValue(sourceRecord, "D");
    }

    @Test
    public void dateAndTimeTest() throws InterruptedException {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("DATE_TIME_TABLE")).with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(7).recordsForTopic(this.DATABASE.topicForTable("DATE_TIME_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Schema schema7 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(6)).schema();
        Schema schema8 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(7)).schema();
        Schema schema9 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(8)).schema();
        Schema schema10 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(9)).schema();
        Schema schema11 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(11)).schema();
        Schema schema12 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(12)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo(2426);
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(ZonedTimestamp.toIsoString(Timestamp.valueOf("1970-01-01 00:00:01").toInstant().atZone(ZoneId.systemDefault()), ZoneId.systemDefault(), MySqlValueConverters::adjustTemporal));
        Assertions.assertThat(schema3.defaultValue()).isEqualTo(Long.valueOf(io.debezium.time.Timestamp.toEpochMillis(LocalDateTime.from(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").parse("2018-01-03 00:00:10")), MySqlValueConverters::adjustTemporal)));
        Assertions.assertThat(schema4.defaultValue()).isEqualTo(Long.valueOf(io.debezium.time.Timestamp.toEpochMillis(LocalDateTime.from(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S").parse("2018-01-03 00:00:10.7")), MySqlValueConverters::adjustTemporal)));
        Assertions.assertThat(schema5.defaultValue()).isEqualTo(Long.valueOf(MicroTimestamp.toEpochMicros(LocalDateTime.from(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS").parse("2018-01-03 00:00:10.123456")), MySqlValueConverters::adjustTemporal)));
        Assertions.assertThat(schema6.defaultValue()).isEqualTo(2001);
        Assertions.assertThat(schema7.defaultValue()).isEqualTo(0L);
        Assertions.assertThat(schema8.defaultValue()).isEqualTo(82800700000L);
        Assertions.assertThat(schema9.defaultValue()).isEqualTo(82800123456L);
        Assertions.assertThat(schema11.defaultValue()).isEqualTo(Long.valueOf(Duration.ofHours(-23L).minusMinutes(45L).minusSeconds(56L).minusMillis(700L).toNanos() / 1000));
        Assertions.assertThat(schema12.defaultValue()).isEqualTo(Long.valueOf(Duration.ofHours(123L).plus(123456L, ChronoUnit.MICROS).toNanos() / 1000));
        Assertions.assertThat(schema10.defaultValue()).isEqualTo(MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName()).databaseAsserts().currentDateTimeDefaultOptional(ZonedTimestamp.toIsoString(ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC), ZoneOffset.UTC, MySqlValueConverters::adjustTemporal)));
        assertEmptyFieldValue(sourceRecord, "K");
    }

    @Test
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void timeTypeWithConnectMode() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("DATE_TIME_TABLE")).with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT).with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(7).recordsForTopic(this.DATABASE.topicForTable("DATE_TIME_TABLE")).get(0);
        validate(sourceRecord);
        Schema schema = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(0)).schema();
        Schema schema2 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema();
        Schema schema3 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(2)).schema();
        Schema schema4 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(3)).schema();
        Schema schema5 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(4)).schema();
        Schema schema6 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(5)).schema();
        Schema schema7 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(6)).schema();
        Schema schema8 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(7)).schema();
        Schema schema9 = ((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(8)).schema();
        Assertions.assertThat(schema.defaultValue()).isEqualTo(Date.from(LocalDate.from(DateTimeFormatter.ofPattern("yyyy-MM-dd").parse("1976-08-23")).atStartOfDay().toInstant(ZoneOffset.UTC)));
        Assertions.assertThat(schema2.defaultValue()).isEqualTo(ZonedTimestamp.toIsoString(Timestamp.valueOf("1970-01-01 00:00:01").toInstant().atZone(ZoneId.systemDefault()), ZoneId.systemDefault(), MySqlValueConverters::adjustTemporal));
        Assertions.assertThat(schema3.defaultValue()).isEqualTo(new Date(io.debezium.time.Timestamp.toEpochMillis(LocalDateTime.from(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").parse("2018-01-03 00:00:10")), MySqlValueConverters::adjustTemporal)));
        Assertions.assertThat(schema4.defaultValue()).isEqualTo(new Date(io.debezium.time.Timestamp.toEpochMillis(LocalDateTime.from(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S").parse("2018-01-03 00:00:10.7")), MySqlValueConverters::adjustTemporal)));
        Assertions.assertThat(schema5.defaultValue()).isEqualTo(new Date(io.debezium.time.Timestamp.toEpochMillis(LocalDateTime.from(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS").parse("2018-01-03 00:00:10.123456")), MySqlValueConverters::adjustTemporal)));
        Assertions.assertThat(schema6.defaultValue()).isEqualTo(2001);
        Assertions.assertThat(schema7.defaultValue()).isEqualTo(new Date(io.debezium.time.Timestamp.toEpochMillis(Time.valueOf("00:00:00").toLocalTime(), MySqlValueConverters::adjustTemporal)));
        Assertions.assertThat(schema8.defaultValue()).isEqualTo(new Date(io.debezium.time.Time.toMilliOfDay(Duration.between(LocalTime.MIN, LocalTime.from(DateTimeFormatter.ofPattern("HH:mm:ss.S").parse("23:00:00.7"))), false)));
        Assertions.assertThat(schema9.defaultValue()).isEqualTo(new Date(io.debezium.time.Time.toMilliOfDay(Duration.between(LocalTime.MIN, LocalTime.from(DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").parse("23:00:00.123456"))), false)));
        assertEmptyFieldValue(sourceRecord, "K");
    }

    @Test
    @FixFor({"DBZ-771"})
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void columnTypeAndDefaultValueChange() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("DBZ_771_CUSTOMERS")).get(0);
        validate(sourceRecord);
        Assertions.assertThat(((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema().defaultValue()).isEqualTo("b2c");
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"alter table DBZ_771_CUSTOMERS change customer_type customer_type int default 42;"});
                connect.execute(new String[]{"insert into DBZ_771_CUSTOMERS (id) values (2);"});
                if (connect != null) {
                    if (0 != 0) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic(2).recordsForTopic(this.DATABASE.topicForTable("DBZ_771_CUSTOMERS")).get(0);
                validate(sourceRecord2);
                Assertions.assertThat(((Field) ((Field) sourceRecord2.valueSchema().fields().get(1)).schema().fields().get(1)).schema().defaultValue()).isEqualTo(42);
            } catch (Throwable th4) {
                if (connect != null) {
                    if (0 != 0) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-771", "DBZ-1321"})
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void columnTypeChangeResetsDefaultValue() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).build();
        start(MySqlConnector.class, this.config);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(EVENT_COUNT).recordsForTopic(this.DATABASE.topicForTable("DBZ_771_CUSTOMERS")).get(0);
        validate(sourceRecord);
        Assertions.assertThat(((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema().defaultValue()).isEqualTo("b2c");
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                connect.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
                connect.execute(new String[]{"alter table DBZ_771_CUSTOMERS change customer_type customer_type int;"});
                connect.execute(new String[]{"insert into DBZ_771_CUSTOMERS (id, customer_type) values (2, 456);"});
                connect.execute(new String[]{"alter table DBZ_771_CUSTOMERS modify customer_type int null;"});
                connect.execute(new String[]{"alter table DBZ_771_CUSTOMERS modify customer_type int not null;"});
                if (connect != null) {
                    if (0 != 0) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic(4).recordsForTopic(this.DATABASE.topicForTable("DBZ_771_CUSTOMERS")).get(0);
                validate(sourceRecord2);
                Assertions.assertThat(((Field) ((Field) sourceRecord2.valueSchema().fields().get(1)).schema().fields().get(1)).schema().defaultValue()).isNull();
            } catch (Throwable th4) {
                if (connect != null) {
                    if (0 != 0) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-2267"})
    public void alterDateAndTimeTest() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("ALTER_DATE_TIME")).with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).build();
        start(MySqlConnector.class, this.config);
        waitForSnapshotToBeCompleted("mysql", this.DATABASE.getServerName());
        Testing.Print.enable();
        MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        Throwable th = null;
        try {
            JdbcConnection connect = forTestDatabase.connect();
            Throwable th2 = null;
            try {
                try {
                    connect.execute(new String[]{"create table ALTER_DATE_TIME (ID int primary key);"});
                    connect.execute(new String[]{"alter table ALTER_DATE_TIME add column CREATED timestamp not null default current_timestamp"});
                    connect.execute(new String[]{"insert into ALTER_DATE_TIME values(1000, default);"});
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(1).allRecordsInOrder().get(0);
                    validate(sourceRecord);
                    Assertions.assertThat(((Field) ((Field) sourceRecord.valueSchema().fields().get(1)).schema().fields().get(1)).schema().defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
                } finally {
                }
            } catch (Throwable th4) {
                if (connect != null) {
                    if (th2 != null) {
                        try {
                            connect.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (forTestDatabase != null) {
                if (0 != 0) {
                    try {
                        forTestDatabase.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    forTestDatabase.close();
                }
            }
        }
    }
}
