package io.debezium.connector.vitess;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.vitess.AbstractVitessConnectorTest;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.VitessOffsetContext;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.TableId;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import junit.framework.TestCase;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/vitess/VitessConnectorIT.class */
public class VitessConnectorIT extends AbstractVitessConnectorTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(VitessConnectorIT.class);
    private AbstractVitessConnectorTest.TestConsumer consumer;
    private VitessConnector connector;

    @Before
    public void before() {
        Testing.Print.enable();
    }

    @After
    public void after() {
        stopConnector();
        assertConnectorNotRunning();
    }

    @Test
    public void shouldValidateConnectorConfigDef() {
        this.connector = new VitessConnector();
        Assertions.assertThat(this.connector.config()).isNotNull();
        VitessConnectorConfig.ALL_FIELDS.forEach(this::validateFieldDef);
    }

    @Test
    public void shouldNotStartWithInvalidConfiguration() {
        Configuration build = Configuration.create().build();
        LOGGER.info("Attempting to start the connector with an INVALID configuration, so MULTIPLE error messages & one exceptions will appear in the log");
        start(VitessConnector.class, build, (z, str, th) -> {
            Assertions.assertThat(z).isFalse();
            Assertions.assertThat(th).isNotNull();
        });
    }

    @Test
    public void shouldValidateMinimalConfiguration() {
        Configuration build = TestHelper.defaultConfig().build();
        VitessConnector vitessConnector = new VitessConnector();
        vitessConnector.start(build.asMap());
        vitessConnector.validate(build.asMap()).configValues().forEach(configValue -> {
            Assert.assertTrue("Unexpected error for: " + configValue.name(), configValue.errorMessages().isEmpty());
        });
    }

    @Test
    @FixFor({"DBZ-2776"})
    public void shouldReceiveChangesForInsertsWithDifferentDataTypes() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector();
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.expects(1);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), "id");
        this.consumer.expects(1);
        assertInsert("INSERT INTO string_table (char_col,varchar_col,varchar_ko_col,varchar_ja_col,tinytext_col,text_col,mediumtext_col,longtext_col,json_col) VALUES ('a', 'bc', '상품 명1', 'リンゴ', 'gh', 'ij', 'kl', 'mn', '{\"key1\": \"value1\", \"key2\": {\"key21\": \"value21\", \"key22\": \"value22\"}}');", schemasAndValuesForStringTypes(), "id");
        this.consumer.expects(1);
        assertInsert("INSERT INTO string_table (binary_col,varbinary_col,blob_col,mediumblob_col) VALUES ('d', 'ef', 'op', 'qs');", schemasAndValuesForBytesTypesAsBytes(), "id");
        this.consumer.expects(1);
        assertInsert("INSERT INTO enum_table (enum_col) VALUES ('large');", schemasAndValuesForEnumType(), "id");
        this.consumer.expects(1);
        assertInsert("INSERT INTO set_table (set_col) VALUES ('a,c');", schemasAndValuesForSetType(), "id");
        this.consumer.expects(1);
        assertInsert("INSERT INTO time_table (time_col,date_col,datetime_col,timestamp_col,year_col) VALUES ('01:02:03', '2020-02-11', '2020-02-12 01:02:03', '2020-02-13 01:02:03', '2020')", schemasAndValuesForTimeType(), "id");
    }

    @Test
    public void shouldReceiveBytesAsBytes() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.BYTES);
        }, false);
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.expects(1);
        assertInsert("INSERT INTO string_table (binary_col,varbinary_col,blob_col,mediumblob_col) VALUES ('d', 'ef', 'op', 'qs');", schemasAndValuesForBytesTypesAsBytes(), "id");
    }

    @Test
    public void shouldReceiveBytesAsBase64String() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(VitessConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.BASE64);
        }, false);
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.expects(1);
        assertInsert("INSERT INTO string_table (binary_col,varbinary_col,blob_col,mediumblob_col) VALUES ('d', 'ef', 'op', 'qs');", schemasAndValuesForBytesTypesAsBase64String(), "id");
    }

    @Test
    public void shouldReceiveBytesAsHexString() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(VitessConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.HEX);
        }, false);
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.expects(1);
        assertInsert("INSERT INTO string_table (binary_col,varbinary_col,blob_col,mediumblob_col) VALUES ('d', 'ef', 'op', 'qs');", schemasAndValuesForBytesTypesAsHexString(), "id");
    }

    @Test
    public void shouldOffsetIncrementAfterDDL() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector();
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        SourceRecord assertInsert = assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), "id");
        TestHelper.execute("ALTER TABLE numeric_table ADD foo INT default 10;");
        this.consumer.expects(1);
        List<AbstractVitessConnectorTest.SchemaAndValueField> schemasAndValuesForNumericTypes = schemasAndValuesForNumericTypes();
        schemasAndValuesForNumericTypes.add(new AbstractVitessConnectorTest.SchemaAndValueField("foo", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10));
        Assert.assertEquals(AbstractVitessConnectorTest.RecordOffset.fromSourceInfo(assertInsert).incrementOffset(1 + 1).getVgtid(), (String) assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes, "id").sourceOffset().get("vgtid"));
    }

    @Test
    @FixFor({"DBZ-4353"})
    public void shouldSchemaUpdatedAfterOnlineDdl() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector();
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), "id");
        String applyOnlineDdl = TestHelper.applyOnlineDdl("ALTER TABLE numeric_table ADD COLUMN foo INT", TestHelper.TEST_UNSHARDED_KEYSPACE);
        Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())).pollInterval(Duration.ofSeconds(1L)).until(() -> {
            return Boolean.valueOf(TestHelper.checkOnlineDDL(TestHelper.TEST_UNSHARDED_KEYSPACE, applyOnlineDdl));
        });
        List<AbstractVitessConnectorTest.SchemaAndValueField> schemasAndValuesForNumericTypes = schemasAndValuesForNumericTypes();
        schemasAndValuesForNumericTypes.add(new AbstractVitessConnectorTest.SchemaAndValueField("foo", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10));
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col,foo) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true, 10);", schemasAndValuesForNumericTypes, "id");
    }

    @Test
    public void shouldSameTransactionLastRowOffsetBeNewVgtid() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector();
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        SourceRecord assertInsert = assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), "id");
        ArrayList arrayList = new ArrayList(2);
        IntStream.rangeClosed(1, 2).forEach(i -> {
            arrayList.add("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);");
        });
        this.consumer.expects(2);
        TableId tableIdFromInsertStmt = tableIdFromInsertStmt("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);");
        executeAndWait(arrayList);
        for (int i2 = 1; i2 <= 2; i2++) {
            SourceRecord assertRecordInserted = assertRecordInserted(topicNameFromInsertStmt("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);"), "id");
            if (i2 != 2) {
                assertRecordOffset(assertRecordInserted, AbstractVitessConnectorTest.RecordOffset.fromSourceInfo(assertInsert));
            } else {
                assertRecordOffset(assertRecordInserted, AbstractVitessConnectorTest.RecordOffset.fromSourceInfo(assertRecordInserted));
            }
            assertSourceInfo(assertRecordInserted, "test_server", TestHelper.TEST_UNSHARDED_KEYSPACE, tableIdFromInsertStmt.table());
            assertRecordSchemaAndValues(schemasAndValuesForNumericTypes(), assertRecordInserted, "after");
        }
    }

    @Test
    public void shouldMultipleRowsInSameStmtLastRowOffsetBeNewVgtid() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector();
        assertConnectorIsRunning();
        this.consumer = testConsumer(2, new String[0]);
        Vgtid currentVgtid = TestHelper.getCurrentVgtid();
        executeAndWait("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true), (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);");
        TableId tableIdFromInsertStmt = tableIdFromInsertStmt("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true), (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);");
        for (int i = 1; i <= 2; i++) {
            SourceRecord assertRecordInserted = assertRecordInserted(topicNameFromInsertStmt("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true), (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);"), "id");
            if (i != 2) {
                assertRecordOffset(assertRecordInserted, new AbstractVitessConnectorTest.RecordOffset(currentVgtid.toString()));
            } else {
                String vgtid = AbstractVitessConnectorTest.RecordOffset.fromSourceInfo(assertRecordInserted).getVgtid();
                Assertions.assertThat(vgtid).isNotNull();
                Assertions.assertThat(vgtid).isNotEqualTo(currentVgtid.toString());
            }
            assertSourceInfo(assertRecordInserted, "test_server", TestHelper.TEST_UNSHARDED_KEYSPACE, tableIdFromInsertStmt.table());
            assertRecordSchemaAndValues(schemasAndValuesForNumericTypes(), assertRecordInserted, "after");
        }
    }

    @Test
    public void shouldUsePrevVgtidAsOffsetWhenNoVgtidInGrpcResponse() throws Exception {
        Testing.Print.disable();
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector();
        assertConnectorIsRunning();
        Vgtid currentVgtid = TestHelper.getCurrentVgtid();
        this.consumer = testConsumer(10000, new String[0]);
        StringBuilder append = new StringBuilder().append("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES " + "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)");
        for (int i = 1; i < 10000; i++) {
            append.append(", ").append("(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)");
        }
        try {
            executeAndWait(append.toString());
            for (int i2 = 1; i2 <= 10000; i2++) {
                SourceRecord assertRecordInserted = assertRecordInserted("test_unsharded_keyspace.numeric_table", "id");
                if (i2 != 10000) {
                    assertRecordOffset(assertRecordInserted, new AbstractVitessConnectorTest.RecordOffset(currentVgtid.toString()));
                } else {
                    String vgtid = AbstractVitessConnectorTest.RecordOffset.fromSourceInfo(assertRecordInserted).getVgtid();
                    Assertions.assertThat(vgtid).isNotNull();
                    Assertions.assertThat(vgtid).isNotEqualTo(currentVgtid.toString());
                }
            }
            Testing.Print.enable();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    @FixFor({"DBZ-5063"})
    public void shouldUseSameTransactionIdWhenMultiGrpcResponses() throws Exception {
        Testing.Print.disable();
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true);
        }, false);
        assertConnectorIsRunning();
        Vgtid currentVgtid = TestHelper.getCurrentVgtid();
        this.consumer = testConsumer(10000 + 2 + 4, new String[0]);
        String str = "INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES " + "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)";
        StringBuilder append = new StringBuilder().append(str);
        for (int i = 1; i < 10000; i++) {
            append.append(", ").append("(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)");
        }
        StringBuilder append2 = new StringBuilder().append(str);
        for (int i2 = 1; i2 < 2; i2++) {
            append2.append(", ").append("(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)");
        }
        String sb = append.toString();
        String sb2 = append2.toString();
        try {
            TestHelper.execute(sb);
            executeAndWait(sb2);
            String assertRecordBegin = assertRecordBegin();
            for (int i3 = 1; i3 <= 10000; i3++) {
                String string = ((Struct) assertRecordInserted("test_unsharded_keyspace.numeric_table", "id").value()).getStruct("transaction").getString("id");
                Assertions.assertThat(string).isNotNull();
                Assertions.assertThat(string).isEqualTo(assertRecordBegin);
                Assertions.assertThat(Vgtid.of(string)).isNotEqualTo(currentVgtid);
            }
            assertRecordEnd(assertRecordBegin, 10000);
            String assertRecordBegin2 = assertRecordBegin();
            for (int i4 = 1; i4 <= 2; i4++) {
                String string2 = ((Struct) assertRecordInserted("test_unsharded_keyspace.numeric_table", "id").value()).getStruct("transaction").getString("id");
                Assertions.assertThat(string2).isNotNull();
                Assertions.assertThat(string2).isEqualTo(assertRecordBegin2);
                Assertions.assertThat(Vgtid.of(string2)).isNotEqualTo(Vgtid.of(assertRecordBegin));
            }
            assertRecordEnd(assertRecordBegin2, 2);
            Testing.Print.enable();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void shouldMultiShardSubscriptionHaveMultiShardGtidsInVgtid() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl", TestHelper.TEST_SHARDED_KEYSPACE);
        TestHelper.applyVSchema("vitess_vschema.json");
        startConnector(true);
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), TestHelper.TEST_SHARDED_KEYSPACE, "id", true);
    }

    @Test
    public void shouldMultiShardConfigSubscriptionHaveMultiShardGtidsInVgtid() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl", TestHelper.TEST_SHARDED_KEYSPACE);
        TestHelper.applyVSchema("vitess_vschema.json");
        startConnector(true, "-80,80-");
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), TestHelper.TEST_SHARDED_KEYSPACE, "id", true);
    }

    @Test
    public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgtid() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl", TestHelper.TEST_SHARDED_KEYSPACE);
        TestHelper.applyVSchema("vitess_vschema.json");
        startConnector(Function.identity(), true, true, 2, 0, 1, null, null, null);
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), TestHelper.TEST_SHARDED_KEYSPACE, "id", true);
    }

    @Test
    @FixFor({"DBZ-2578"})
    public void shouldUseMultiColumnPkAsRecordKey() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl", TestHelper.TEST_SHARDED_KEYSPACE);
        TestHelper.applyVSchema("vitess_vschema.json");
        startConnector(true);
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO comp_pk_table (int_col, int_col2) VALUES (1, 2);", TestHelper.TEST_SHARDED_KEYSPACE);
        SourceRecord remove = this.consumer.remove();
        String str = topicNameFromInsertStmt("INSERT INTO comp_pk_table (int_col, int_col2) VALUES (1, 2);", TestHelper.TEST_SHARDED_KEYSPACE);
        TableId tableIdFromInsertStmt = tableIdFromInsertStmt("INSERT INTO comp_pk_table (int_col, int_col2) VALUES (1, 2);", TestHelper.TEST_SHARDED_KEYSPACE);
        assertRecordInserted(remove, str, "id");
        assertRecordInserted(remove, str, "int_col");
        assertRecordOffset(remove, true);
        assertSourceInfo(remove, "test_server", TestHelper.TEST_SHARDED_KEYSPACE, tableIdFromInsertStmt.table());
    }

    @Test
    @FixFor({"DBZ-2578"})
    public void shouldUseUniqueKeyAsRecordKey() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(VitessReplicationConnection.class);
        TestHelper.executeDDL("vitess_create_tables.ddl", TestHelper.TEST_SHARDED_KEYSPACE);
        TestHelper.applyVSchema("vitess_vschema.json");
        startConnector(true);
        assertConnectorIsRunning();
        waitForShardedGtidAcquiring(logInterceptor);
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO no_pk_multi_unique_keys_table (int_col, int_col2) VALUES (1, 2);", null, TestHelper.TEST_SHARDED_KEYSPACE, "int_col", true);
        this.consumer.expects(1);
        assertInsert("INSERT INTO no_pk_multi_comp_unique_keys_table (int_col, int_col2, int_col3, int_col4, int_col5) VALUES (1, 2, 3, 4, 5);", null, TestHelper.TEST_SHARDED_KEYSPACE, "int_col3", true);
    }

    @Test
    @FixFor({"DBZ-2578"})
    public void shouldNotHaveRecordKeyIfNoPrimaryKeyUniqueKey() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl", TestHelper.TEST_SHARDED_KEYSPACE);
        TestHelper.applyVSchema("vitess_vschema.json");
        startConnector(true);
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO no_pk_table (int_col) VALUES (1);", null, TestHelper.TEST_SHARDED_KEYSPACE, null, true);
    }

    @Test
    @FixFor({"DBZ-2578"})
    public void shouldPrioritizePrimaryKeyAsRecordKey() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl", TestHelper.TEST_SHARDED_KEYSPACE);
        TestHelper.applyVSchema("vitess_vschema.json");
        startConnector(true);
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO pk_single_unique_key_table (int_col) VALUES (1);", null, TestHelper.TEST_SHARDED_KEYSPACE, "id", true);
    }

    @Test
    @FixFor({"DBZ-2836"})
    public void shouldTaskFailIfColumnNameInvalid() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        start(VitessConnector.class, TestHelper.defaultConfig().build(), (z, str, th) -> {
            if (th != null) {
                countDownLatch.countDown();
            } else {
                Assert.fail("A controlled exception was expected....");
            }
        });
        assertConnectorIsRunning();
        waitForStreamingRunning((String) null);
        TestHelper.execute("ALTER TABLE numeric_table ADD `@1` INT;");
        TestHelper.execute("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);");
        if (countDownLatch.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)) {
            return;
        }
        Assert.fail("did not reach stop condition in time");
    }

    @Test
    @FixFor({"DBZ-2852"})
    public void shouldTaskFailIfUsernamePasswordInvalid() throws InterruptedException {
        Configuration.Builder with = TestHelper.defaultConfig().with(VitessConnectorConfig.VTGATE_USER, "incorrect_username").with(VitessConnectorConfig.VTGATE_PASSWORD, "incorrect_password");
        HashMap hashMap = new HashMap();
        start(VitessConnector.class, with.build(), (z, str, th) -> {
            hashMap.put("success", Boolean.valueOf(z));
            hashMap.put("message", str);
            hashMap.put("error", th);
        });
        TestCase.assertEquals(false, hashMap.get("success"));
        Assertions.assertThat(hashMap.get("message").toString().contains("Connector configuration is not valid. Unable to connect: "));
        TestCase.assertEquals((Object) null, hashMap.get("error"));
    }

    @Test
    @FixFor({"DBZ-2851"})
    public void shouldSanitizeFieldNames() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        TestHelper.execute("ALTER TABLE numeric_table ADD `-foo-` INT default 10;");
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.FIELD_NAME_ADJUSTMENT_MODE, "avro");
        }, false);
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        List<AbstractVitessConnectorTest.SchemaAndValueField> schemasAndValuesForNumericTypes = schemasAndValuesForNumericTypes();
        schemasAndValuesForNumericTypes.add(new AbstractVitessConnectorTest.SchemaAndValueField("_foo_", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10));
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes, "id");
    }

    @Test
    @FixFor({"DBZ-2906"})
    public void shouldSanitizeDecimalValue() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        TestHelper.execute("ALTER TABLE numeric_table ADD decimal_col2 DECIMAL(14, 4) DEFAULT 12.3400;");
        TestHelper.execute("ALTER TABLE numeric_table ADD decimal_col3 DECIMAL(14, 4) DEFAULT -12.3400;");
        startConnector();
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        List<AbstractVitessConnectorTest.SchemaAndValueField> schemasAndValuesForNumericTypes = schemasAndValuesForNumericTypes();
        schemasAndValuesForNumericTypes.add(new AbstractVitessConnectorTest.SchemaAndValueField("decimal_col2", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "12.3400"));
        schemasAndValuesForNumericTypes.add(new AbstractVitessConnectorTest.SchemaAndValueField("decimal_col3", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "-12.3400"));
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes, "id");
    }

    @Test
    @FixFor({"DBZ-3668"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(VitessReplicationConnection.class);
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector();
        waitForGtidAcquiring(logInterceptor);
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO no_pk_table (id,int_col) values (1001, 1)");
        SourceRecord remove = this.consumer.remove();
        CloudEventsConverterTest.shouldConvertToCloudEventsInJson(remove, false);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(remove, false);
        CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(remove, "vitess", "test_server", false);
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO no_pk_table (id,int_col) values (1002, 2)");
        SourceRecord remove2 = this.consumer.remove();
        CloudEventsConverterTest.shouldConvertToCloudEventsInJson(remove2, false, jsonNode -> {
            Assertions.assertThat(jsonNode.get("id").asText()).contains(new CharSequence[]{"vgtid:"});
        });
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(remove2, false);
        CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(remove2, "vitess", "test_server", false);
    }

    @Test
    public void testNoPerTaskOffsetStorage() throws Exception {
        testOffsetStorage(false);
    }

    @Test
    public void testPerTaskOffsetStorage() throws Exception {
        testOffsetStorage(true);
    }

    @Test
    public void testTableIncludeFilter() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector(Function.identity(), false, false, 1, -1, -1, "test_unsharded_keyspace.numeric_table", "");
        assertConnectorIsRunning();
        this.consumer = testConsumer(1, new String[0]);
        TestHelper.execute("INSERT INTO string_table (char_col,varchar_col,varchar_ko_col,varchar_ja_col,tinytext_col,text_col,mediumtext_col,longtext_col,json_col) VALUES ('a', 'bc', '상품 명1', 'リンゴ', 'gh', 'ij', 'kl', 'mn', '{\"key1\": \"value1\", \"key2\": {\"key21\": \"value21\", \"key22\": \"value22\"}}');", TestHelper.TEST_UNSHARDED_KEYSPACE);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), "id");
    }

    @Test
    public void testGetVitessShards() throws Exception {
        TestCase.assertEquals(new HashSet(Arrays.asList(TestHelper.TEST_SHARD)), new HashSet(VitessConnector.getVitessShards(new VitessConnectorConfig(TestHelper.defaultConfig().build()))));
    }

    @Test
    public void testGetKeyspaceTables() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        TestCase.assertEquals(new HashSet(Arrays.asList("my_seq", "t1", "numeric_table", "string_table", "enum_table", "set_table", "time_table", "no_pk_table", "pk_single_unique_key_table", "no_pk_multi_unique_keys_table", "no_pk_multi_comp_unique_keys_table", "comp_pk_table")), (Set) new HashSet(VitessConnector.getKeyspaceTables(new VitessConnectorConfig(TestHelper.defaultConfig().build()))).stream().filter(str -> {
            return !str.startsWith("_");
        }).collect(Collectors.toSet()));
    }

    @Test
    public void testCopyAndReplicateTable() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        TestHelper.execute("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", TestHelper.TEST_UNSHARDED_KEYSPACE);
        startConnector(Function.identity(), false, false, 1, -1, -1, "test_unsharded_keyspace.numeric_table", null, TestHelper.TEST_SHARD);
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        SourceRecord assertRecordInserted = assertRecordInserted(topicNameFromInsertStmt("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);"), "id");
        assertSourceInfo(assertRecordInserted, "test_server", TestHelper.TEST_UNSHARDED_KEYSPACE, "numeric_table");
        assertRecordSchemaAndValues(schemasAndValuesForNumericTypes(), assertRecordInserted, "after");
        this.consumer.expects(1);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), "id");
    }

    @Test
    public void testCopyNoRecordsAndReplicateTable() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        startConnector(Function.identity(), false, false, 1, -1, -1, "test_unsharded_keyspace.numeric_table", null, null);
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), "id");
    }

    @Test
    public void testInitialSnapshotModeHaveMultiShard() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl", TestHelper.TEST_SHARDED_KEYSPACE);
        TestHelper.applyVSchema("vitess_vschema.json");
        TestHelper.execute("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", TestHelper.TEST_SHARDED_KEYSPACE);
        startConnector(Function.identity(), true, false, 1, -1, -1, null, null, null);
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        SourceRecord assertRecordInserted = assertRecordInserted(topicNameFromInsertStmt("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", TestHelper.TEST_SHARDED_KEYSPACE), "id");
        assertSourceInfo(assertRecordInserted, "test_server", TestHelper.TEST_SHARDED_KEYSPACE, "numeric_table");
        assertRecordSchemaAndValues(schemasAndValuesForNumericTypes(), assertRecordInserted, "after");
        this.consumer.expects(1);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), TestHelper.TEST_SHARDED_KEYSPACE, "id", true);
    }

    @Test
    public void testCopyTableAndRestart() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        TestHelper.execute("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", TestHelper.TEST_UNSHARDED_KEYSPACE);
        startConnector(Function.identity(), false, false, 1, -1, -1, "test_unsharded_keyspace..*_table", null, null);
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        SourceRecord assertRecordInserted = assertRecordInserted(topicNameFromInsertStmt("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);"), "id");
        assertSourceInfo(assertRecordInserted, "test_server", TestHelper.TEST_UNSHARDED_KEYSPACE, "numeric_table");
        assertRecordSchemaAndValues(schemasAndValuesForNumericTypes(), assertRecordInserted, "after");
        stopConnector();
        startConnector(Function.identity(), false, false, 1, -1, -1, null, null, null);
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), "id");
    }

    @Test
    public void testCopyAndReplicatePerTaskOffsetStorage() throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl");
        TestHelper.execute("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", TestHelper.TEST_UNSHARDED_KEYSPACE);
        startConnector(Function.identity(), false, true, 1, 0, 1, "test_unsharded_keyspace.numeric_table", null, null);
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        SourceRecord assertRecordInserted = assertRecordInserted(topicNameFromInsertStmt("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);"), "id");
        assertSourceInfo(assertRecordInserted, "test_server", TestHelper.TEST_UNSHARDED_KEYSPACE, "numeric_table");
        assertRecordSchemaAndValues(schemasAndValuesForNumericTypes(), assertRecordInserted, "after");
        this.consumer.expects(1);
        assertInsert("INSERT INTO numeric_table (tinyint_col,tinyint_unsigned_col,smallint_col,smallint_unsigned_col,mediumint_col,mediumint_unsigned_col,int_col,int_unsigned_col,bigint_col,bigint_unsigned_col,bigint_unsigned_overflow_col,float_col,double_col,decimal_col,boolean_col) VALUES (1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true);", schemasAndValuesForNumericTypes(), "id");
    }

    private void testOffsetStorage(boolean z) throws Exception {
        TestHelper.executeDDL("vitess_create_tables.ddl", TestHelper.TEST_UNSHARDED_KEYSPACE);
        Configuration build = TestHelper.defaultConfig(false, z, 1, 0, 1, null, VitessConnectorConfig.SnapshotMode.NEVER, TestHelper.TEST_SHARD).build();
        String string = build.getString(CommonConnectorConfig.TOPIC_PREFIX);
        Map hashMapOf = Collect.hashMapOf("server", string);
        if (z) {
            hashMapOf.put("task_key", VitessConnector.getTaskKeyName(0, 1, 0));
        }
        startConnector(Function.identity(), false, z, 1, 0, 1, null, "");
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO pk_single_unique_key_table (id, int_col) VALUES (1, 1);", TestHelper.TEST_UNSHARDED_KEYSPACE);
        SourceRecord remove = this.consumer.remove();
        Map sourceOffset = remove.sourceOffset();
        Map sourcePartition = remove.sourcePartition();
        Testing.print(String.format("Offset: %s, partition: %s", sourceOffset, sourcePartition));
        Vgtid of = Vgtid.of((String) sourceOffset.get("vgtid"));
        TestCase.assertEquals(of.getShardGtids().size(), 1);
        TestCase.assertEquals(sourcePartition, hashMapOf);
        stopConnector();
        VitessOffsetContext.Loader loader = new VitessOffsetContext.Loader(new VitessConnectorConfig(Configuration.create().with(CommonConnectorConfig.TOPIC_PREFIX, string).build()));
        Map sourcePartition2 = new VitessPartition(string, z ? VitessConnector.getTaskKeyName(0, 1, 0) : null).getSourcePartition();
        Map readLastCommittedOffset = readLastCommittedOffset(build, sourcePartition2);
        Vgtid restartVgtid = loader.load(readLastCommittedOffset).getRestartVgtid();
        Testing.print(String.format("task: %d, Offset: %s", 0, readLastCommittedOffset));
        Testing.print(String.format("task: %d, vgtid: %s", 0, of));
        TestCase.assertEquals(sourceOffset, readLastCommittedOffset);
        TestCase.assertEquals(sourcePartition, sourcePartition2);
        TestCase.assertEquals(of, restartVgtid);
        Testing.print("*** Done with verifying without offset.storage.per.task");
    }

    private void waitForGtidAcquiring(LogInterceptor logInterceptor) {
        Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())).until(() -> {
            return Boolean.valueOf(logInterceptor.containsMessage("set to the GTID [current] for keyspace"));
        });
    }

    private void waitForShardedGtidAcquiring(LogInterceptor logInterceptor) {
        Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())).until(() -> {
            return Boolean.valueOf(logInterceptor.containsMessage("Default VGTID '[{\"keyspace\":"));
        });
    }

    private void waitForVStreamStarted(LogInterceptor logInterceptor) {
        Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())).pollInterval(Duration.ofSeconds(1L)).until(() -> {
            return Boolean.valueOf(logInterceptor.containsMessage("Started VStream"));
        });
    }

    private void startConnector() throws InterruptedException {
        startConnector(false, TestHelper.TEST_SHARD);
    }

    private void startConnector(boolean z) throws InterruptedException {
        startConnector(Function.identity(), z, "");
    }

    private void startConnector(boolean z, String str) throws InterruptedException {
        startConnector(Function.identity(), z, str);
    }

    private void startConnector(Function<Configuration.Builder, Configuration.Builder> function, boolean z) throws InterruptedException {
        startConnector(function, z, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, "");
    }

    private void startConnector(Function<Configuration.Builder, Configuration.Builder> function, boolean z, String str) throws InterruptedException {
        startConnector(function, z, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, str);
    }

    private void startConnector(Function<Configuration.Builder, Configuration.Builder> function, boolean z, boolean z2, int i, int i2, int i3, String str, String str2) throws InterruptedException {
        startConnector(function, z, z2, i, i2, i3, str, VitessConnectorConfig.SnapshotMode.NEVER, str2);
    }

    private void startConnector(Function<Configuration.Builder, Configuration.Builder> function, boolean z, boolean z2, int i, int i2, int i3, String str, VitessConnectorConfig.SnapshotMode snapshotMode, String str2) throws InterruptedException {
        Configuration.Builder apply = function.apply(TestHelper.defaultConfig(z, z2, i, i2, i3, str, snapshotMode, str2));
        LogInterceptor logInterceptor = new LogInterceptor(VitessReplicationConnection.class);
        start(VitessConnector.class, apply.build());
        assertConnectorIsRunning();
        waitForStreamingRunning(z2 ? VitessConnector.getTaskKeyName(0, 1, i2) : null);
        waitForVStreamStarted(logInterceptor);
    }

    private void waitForStreamingRunning(String str) throws InterruptedException {
        waitForStreamingRunning(str, Module.name(), "test_server");
    }

    private SourceRecord assertInsert(String str, List<AbstractVitessConnectorTest.SchemaAndValueField> list, String str2) {
        return assertInsert(str, list, TestHelper.TEST_UNSHARDED_KEYSPACE, str2, false);
    }

    private SourceRecord assertInsert(String str, List<AbstractVitessConnectorTest.SchemaAndValueField> list, String str2, String str3, boolean z) {
        TableId tableIdFromInsertStmt = tableIdFromInsertStmt(str, str2);
        try {
            executeAndWait(str, str2);
            SourceRecord assertRecordInserted = assertRecordInserted(topicNameFromInsertStmt(str, str2), str3);
            assertRecordOffset(assertRecordInserted, z);
            assertSourceInfo(assertRecordInserted, "test_server", str2, tableIdFromInsertStmt.table());
            if (list != null && !list.isEmpty()) {
                assertRecordSchemaAndValues(list, assertRecordInserted, "after");
            }
            return assertRecordInserted;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private SourceRecord assertRecordInserted(String str) {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        return assertRecordInserted(this.consumer.remove(), str);
    }

    private SourceRecord assertRecordInserted(String str, String str2) {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        return assertRecordInserted(this.consumer.remove(), str, str2);
    }

    private SourceRecord assertRecordUpdated() {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        return assertRecordUpdated(this.consumer.remove());
    }

    private SourceRecord assertRecordInserted(SourceRecord sourceRecord, String str) {
        TestCase.assertEquals(topicName(str), sourceRecord.topic());
        VerifyRecord.isValidInsert(sourceRecord);
        return sourceRecord;
    }

    private SourceRecord assertRecordInserted(SourceRecord sourceRecord, String str, String str2) {
        TestCase.assertEquals(topicName(str), sourceRecord.topic());
        if (str2 != null) {
            VitessVerifyRecord.isValidInsert(sourceRecord, str2);
        } else {
            VerifyRecord.isValidInsert(sourceRecord);
        }
        return sourceRecord;
    }

    private SourceRecord assertRecordUpdated(SourceRecord sourceRecord) {
        VerifyRecord.isValidUpdate(sourceRecord);
        return sourceRecord;
    }

    private String assertRecordBegin() {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        Struct struct = (Struct) this.consumer.remove().value();
        Assertions.assertThat(struct.getString("status")).isEqualTo("BEGIN");
        return struct.getString("id");
    }

    private void assertRecordEnd(String str, long j) {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        Struct struct = (Struct) this.consumer.remove().value();
        Assertions.assertThat(struct.getString("status")).isEqualTo("END");
        Assertions.assertThat(struct.getString("id")).isEqualTo(str);
        Assertions.assertThat(struct.getInt64("event_count")).isEqualTo(j);
    }

    private void executeAndWait(String str) throws Exception {
        executeAndWait(str, TestHelper.TEST_UNSHARDED_KEYSPACE);
    }

    private void executeAndWait(String str, String str2) throws Exception {
        executeAndWait(Collections.singletonList(str), str2);
    }

    private void executeAndWait(List<String> list) throws Exception {
        executeAndWait(list, TestHelper.TEST_UNSHARDED_KEYSPACE);
    }

    private void executeAndWait(List<String> list, String str) throws Exception {
        TestHelper.execute(list, str);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
    }

    private static String topicName(String str) {
        return "test_server." + str;
    }

    private void validateFieldDef(Field field) {
        ConfigDef config = this.connector.config();
        Assertions.assertThat(config.names()).contains(new String[]{field.name()});
        ConfigDef.ConfigKey configKey = (ConfigDef.ConfigKey) config.configKeys().get(field.name());
        Assertions.assertThat(configKey).isNotNull();
        Assertions.assertThat(configKey.name).isEqualTo(field.name());
        Assertions.assertThat(configKey.displayName).isEqualTo(field.displayName());
        Assertions.assertThat(configKey.importance).isEqualTo(field.importance());
        Assertions.assertThat(configKey.documentation).isEqualTo(field.description());
        Assertions.assertThat(configKey.type).isEqualTo(field.type());
        if (field.equals(CommonConnectorConfig.TOPIC_NAMING_STRATEGY)) {
            Assertions.assertThat(((Class) configKey.defaultValue).getName()).isEqualTo((String) field.defaultValue());
        } else if (field.type() != ConfigDef.Type.LIST || configKey.defaultValue == null) {
            Assertions.assertThat(configKey.defaultValue).isEqualTo(field.defaultValue());
        } else {
            Assertions.assertThat(configKey.defaultValue).isEqualTo(Arrays.asList(field.defaultValue()));
        }
        Assertions.assertThat(configKey.dependents).isEqualTo(field.dependents());
        Assertions.assertThat(configKey.width).isNotNull();
        Assertions.assertThat(configKey.group).isNotNull();
        Assertions.assertThat(configKey.orderInGroup).isGreaterThan(0);
        Assertions.assertThat(configKey.validator).isNull();
        Assertions.assertThat(configKey.recommender).isNull();
    }
}
