package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.jdbc.JdbcConnection;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotWithSchemaChangesSupportTest.class */
public abstract class AbstractIncrementalSnapshotWithSchemaChangesSupportTest<T extends SourceConnector> extends AbstractIncrementalSnapshotTest<T> {
    protected abstract String tableName(String str);

    protected abstract String alterColumnStatement(String str, String str2, String str3);

    protected abstract String alterColumnSetNotNullStatement(String str, String str2, String str3);

    protected abstract String alterColumnDropNotNullStatement(String str, String str2, String str3);

    protected abstract String alterColumnSetDefaultStatement(String str, String str2, String str3, String str4);

    protected abstract String alterColumnDropDefaultStatement(String str, String str2, String str3);

    protected abstract void executeRenameTable(JdbcConnection jdbcConnection, String str) throws SQLException;

    protected abstract String createTableStatement(String str, String str2);

    @Test
    public void schemaChanges() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(true);
            databaseConnection.execute(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", tableName(), 1011, 1010)});
            databaseConnection.execute(new String[]{alterColumnStatement(tableName(), "aa", "VARCHAR(5)")});
            databaseConnection.execute(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, '%s')", tableName(), 1010, 1009)});
            for (int i = 0; i < 9; i += 3) {
                if (Thread.interrupted()) {
                    break;
                }
                databaseConnection.execute(new String[]{String.format("ALTER TABLE %s ADD c INT", tableName())});
                databaseConnection.execute(new String[]{String.format("INSERT INTO %s (pk, aa, c) VALUES (%s, '%s', %s)", tableName(), Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000), 1)});
                databaseConnection.execute(new String[]{alterColumnStatement(tableName(), "c", "VARCHAR(5)")});
                databaseConnection.execute(new String[]{String.format("INSERT INTO %s (pk, aa, c) VALUES (%s, '%s', '%s')", tableName(), Integer.valueOf(i + 1000 + 2), Integer.valueOf(i + 1000 + 1), "1")});
                databaseConnection.execute(new String[]{String.format("ALTER TABLE %s DROP COLUMN c", tableName())});
                databaseConnection.execute(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, '%s')", tableName(), Integer.valueOf(i + 1000 + 3), Integer.valueOf(i + 1000 + 2))});
            }
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot = consumeRecordsMixedWithIncrementalSnapshot(1011);
            for (int i2 = 0; i2 < 1011; i2++) {
                Assert.assertTrue(String.format("missing PK %d", Integer.valueOf(i2 + 1)), consumeRecordsMixedWithIncrementalSnapshot.containsKey(Integer.valueOf(i2 + 1)));
                SourceRecord sourceRecord = consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i2 + 1));
                if (sourceRecord.valueSchema().field("after").schema().field(valueFieldName()).schema().type() == Schema.Type.INT32) {
                    Assert.assertEquals(i2, ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName()).intValue());
                } else {
                    Assert.assertEquals(Integer.toString(i2), ((Struct) sourceRecord.value()).getStruct("after").getString(valueFieldName()));
                }
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void renameTable() throws Exception {
        populateTable();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(true);
            databaseConnection.execute(new String[]{createTableStatement(tableName("new_table"), tableName())});
            databaseConnection.execute(new String[]{String.format("INSERT INTO %s SELECT * FROM %s", tableName("new_table"), tableName())});
            databaseConnection.execute(new String[]{String.format("ALTER TABLE %s ADD c varchar(5)", tableName("new_table"))});
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            startConnector();
            sendAdHocSnapshotSignal();
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot = consumeRecordsMixedWithIncrementalSnapshot(1000, entry -> {
                return true;
            }, list -> {
                if (atomicInteger.addAndGet(list.size()) <= 10 || atomicBoolean.get()) {
                    return;
                }
                try {
                    JdbcConnection databaseConnection2 = databaseConnection();
                    try {
                        executeRenameTable(databaseConnection2, "new_table");
                        databaseConnection2.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET c = 'c' WHERE pk >= %s", tableName(), 990)});
                        databaseConnection2.commit();
                        if (databaseConnection2 != null) {
                            databaseConnection2.close();
                        }
                        atomicBoolean.set(true);
                    } finally {
                    }
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            });
            for (int i = 0; i < 990; i++) {
                Assert.assertTrue(consumeRecordsMixedWithIncrementalSnapshot.containsKey(Integer.valueOf(i + 1)));
                SourceRecord sourceRecord = consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i + 1));
                Assert.assertEquals(i, ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName()).intValue());
                if (((Struct) sourceRecord.value()).schema().field("c") != null) {
                    Assert.assertNull(((Struct) sourceRecord.value()).getStruct("after").getString("c"));
                }
            }
            for (int i2 = 990; i2 < 1000; i2++) {
                Assert.assertTrue(consumeRecordsMixedWithIncrementalSnapshot.containsKey(Integer.valueOf(i2 + 1)));
                SourceRecord sourceRecord2 = consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i2 + 1));
                Assert.assertEquals(i2, ((Struct) sourceRecord2.value()).getStruct("after").getInt32(valueFieldName()).intValue());
                Assert.assertEquals("c", ((Struct) sourceRecord2.value()).getStruct("after").getString("c"));
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void columnNullabilityChanges() throws Exception {
        populateTable();
        startAndConsumeTillEnd(connectorClass(), config().build());
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            databaseConnection.execute(new String[]{alterColumnSetNotNullStatement(tableName(), "aa", "INTEGER")});
            databaseConnection.commit();
            databaseConnection.execute(new String[]{alterColumnDropNotNullStatement(tableName(), "aa", "INTEGER")});
            databaseConnection.commit();
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, null)", tableName(), Integer.valueOf(i + 1000 + 1))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot = consumeRecordsMixedWithIncrementalSnapshot(2000);
            for (int i2 = 0; i2 < 1000; i2++) {
                Assert.assertEquals(i2, ((Struct) consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i2 + 1)).value()).getStruct("after").getInt32(valueFieldName()).intValue());
            }
            for (int i3 = 1000; i3 < 2000; i3++) {
                Assert.assertNull(((Struct) consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i3 + 1)).value()).getStruct("after").getInt32(valueFieldName()));
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void columnDefaultChanges() throws Exception {
        populateTable();
        startAndConsumeTillEnd(connectorClass(), config().build());
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            databaseConnection.execute(new String[]{alterColumnSetDefaultStatement(tableName(), "aa", "INTEGER", "-6")});
            databaseConnection.commit();
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk) VALUES (%s)", tableName(), Integer.valueOf(i + 1000 + 1))});
            }
            databaseConnection.commit();
            databaseConnection.executeWithoutCommitting(new String[]{alterColumnDropDefaultStatement(tableName(), "aa", "INTEGER")});
            databaseConnection.executeWithoutCommitting(new String[]{alterColumnSetDefaultStatement(tableName(), "aa", "INTEGER", "-9")});
            databaseConnection.commit();
            for (int i2 = 0; i2 < 1000; i2++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk) VALUES (%s)", tableName(), Integer.valueOf(i2 + 2000 + 1))});
            }
            databaseConnection.commit();
            databaseConnection.executeWithoutCommitting(new String[]{alterColumnDropDefaultStatement(tableName(), "aa", "INTEGER")});
            databaseConnection.commit();
            for (int i3 = 0; i3 < 1000; i3++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk) VALUES (%s)", tableName(), Integer.valueOf(i3 + 3000 + 1))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot = consumeRecordsMixedWithIncrementalSnapshot(4000);
            for (int i4 = 0; i4 < 1000; i4++) {
                Assert.assertEquals(i4, ((Struct) consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i4 + 1)).value()).getStruct("after").getInt32(valueFieldName()).intValue());
            }
            for (int i5 = 1000; i5 < 2000; i5++) {
                Integer int32 = ((Struct) consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i5 + 1)).value()).getStruct("after").getInt32(valueFieldName());
                Assert.assertNotNull("value is null at pk=" + (i5 + 1), int32);
                Assert.assertEquals(String.format("value is %d at pk = %d, expected -6", int32, Integer.valueOf(i5 + 1)), -6.0f, int32.intValue(), 0.0f);
            }
            for (int i6 = 2000; i6 < 3000; i6++) {
                Integer int322 = ((Struct) consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i6 + 1)).value()).getStruct("after").getInt32(valueFieldName());
                Assert.assertNotNull("value is null at pk=" + (i6 + 1), int322);
                Assert.assertEquals(String.format("value is %d at pk = %d, expected -9", int322, Integer.valueOf(i6 + 1)), -9.0f, int322.intValue(), 0.0f);
            }
            for (int i7 = 3000; i7 < 4000; i7++) {
                Assert.assertNull("value is not null at pk=" + (i7 + 1), ((Struct) consumeRecordsMixedWithIncrementalSnapshot.get(Integer.valueOf(i7 + 1)).value()).getStruct("after").getInt32(valueFieldName()));
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
