package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
import java.util.List;
import java.util.Optional;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/source/snapshot/incremental/SignalBasedSnapshotChangeEventSourceTest.class */
public class SignalBasedSnapshotChangeEventSourceTest {
    protected RelationalDatabaseConnectorConfig config() {
        return buildConfig(Configuration.create().with(RelationalDatabaseConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").with(RelationalDatabaseConnectorConfig.TOPIC_PREFIX, "core").build());
    }

    protected RelationalDatabaseConnectorConfig buildConfig(Configuration configuration) {
        return new RelationalDatabaseConnectorConfig(configuration, null, null, 0, ColumnFilterMode.CATALOG, true) { // from class: io.debezium.pipeline.source.snapshot.incremental.SignalBasedSnapshotChangeEventSourceTest.1
            protected SourceInfoStructMaker<?> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
                return null;
            }

            public String getContextName() {
                return null;
            }

            public String getConnectorName() {
                return null;
            }
        };
    }

    @Test
    public void testBuildQueryOnePkColumn() {
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(config(), new JdbcConnection(config().getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        SignalBasedIncrementalSnapshotContext signalBasedIncrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext();
        signalBasedIncrementalSnapshotChangeEventSource.setContext(signalBasedIncrementalSnapshotContext);
        Column create = Column.editor().name("pk1").create();
        Table create2 = Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(Column.editor().name("val1").create()).addColumn(Column.editor().name("val2").create()).setPrimaryKeyNames(new String[]{"pk1"}).create();
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create2, Optional.empty())).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk1\" LIMIT 1024");
        signalBasedIncrementalSnapshotContext.nextChunkPosition(new Object[]{1, 5});
        signalBasedIncrementalSnapshotContext.maximumKey(new Object[]{10, 50});
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create2, Optional.empty())).isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE (\"pk1\" > ?) AND NOT (\"pk1\" > ?) ORDER BY \"pk1\" LIMIT 1024");
    }

    @Test
    public void testBuildQueryOnePkColumnWithAdditionalCondition() {
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(config(), new JdbcConnection(config().getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        SignalBasedIncrementalSnapshotContext signalBasedIncrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext();
        signalBasedIncrementalSnapshotChangeEventSource.setContext(signalBasedIncrementalSnapshotContext);
        Column create = Column.editor().name("pk1").create();
        Table create2 = Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(Column.editor().name("val1").create()).addColumn(Column.editor().name("val2").create()).setPrimaryKeyNames(new String[]{"pk1"}).create();
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create2, Optional.of("\"val1\"=foo"))).isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE \"val1\"=foo ORDER BY \"pk1\" LIMIT 1024");
        signalBasedIncrementalSnapshotContext.nextChunkPosition(new Object[]{1, 5});
        signalBasedIncrementalSnapshotContext.maximumKey(new Object[]{10, 50});
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create2, Optional.of("\"val1\"=foo"))).isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE (\"pk1\" > ?) AND NOT (\"pk1\" > ?) AND \"val1\"=foo ORDER BY \"pk1\" LIMIT 1024");
    }

    @Test
    public void testBuildQueryTwoPkColumnsWithAdditionalConditionWithSurrogateKey() {
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(config(), new JdbcConnection(config().getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        SignalBasedIncrementalSnapshotContext signalBasedIncrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext();
        signalBasedIncrementalSnapshotChangeEventSource.setContext(signalBasedIncrementalSnapshotContext);
        Column create = Column.editor().name("pk1").create();
        Column create2 = Column.editor().name("pk2").create();
        Table create3 = Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(create2).addColumn(Column.editor().name("val1").create()).addColumn(Column.editor().name("val2").create()).setPrimaryKeyNames(new String[]{"pk1", "pk2"}).create();
        signalBasedIncrementalSnapshotContext.addDataCollectionNamesToSnapshot(List.of(create3.id().toString()), Optional.empty(), Optional.of("pk2"));
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create3, Optional.of("\"val1\"=foo"))).isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE \"val1\"=foo ORDER BY \"pk2\" LIMIT 1024");
        signalBasedIncrementalSnapshotContext.nextChunkPosition(new Object[]{1, 5});
        signalBasedIncrementalSnapshotContext.maximumKey(new Object[]{10, 50});
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create3, Optional.of("\"val1\"=foo"))).isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE (\"pk2\" > ?) AND NOT (\"pk2\" > ?) AND \"val1\"=foo ORDER BY \"pk2\" LIMIT 1024");
    }

    @Test
    public void testBuildQueryThreePkColumns() {
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(config(), new JdbcConnection(config().getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        SignalBasedIncrementalSnapshotContext signalBasedIncrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext();
        signalBasedIncrementalSnapshotChangeEventSource.setContext(signalBasedIncrementalSnapshotContext);
        Column create = Column.editor().name("pk1").create();
        Column create2 = Column.editor().name("pk2").create();
        Column create3 = Column.editor().name("pk3").create();
        Table create4 = Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(create2).addColumn(create3).addColumn(Column.editor().name("val1").create()).addColumn(Column.editor().name("val2").create()).setPrimaryKeyNames(new String[]{"pk1", "pk2", "pk3"}).create();
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create4, Optional.empty())).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk1\", \"pk2\", \"pk3\" LIMIT 1024");
        signalBasedIncrementalSnapshotContext.nextChunkPosition(new Object[]{1, 5});
        signalBasedIncrementalSnapshotContext.maximumKey(new Object[]{10, 50});
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create4, Optional.empty())).isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE ((\"pk1\" > ?) OR (\"pk1\" = ? AND \"pk2\" > ?) OR (\"pk1\" = ? AND \"pk2\" = ? AND \"pk3\" > ?)) AND NOT ((\"pk1\" > ?) OR (\"pk1\" = ? AND \"pk2\" > ?) OR (\"pk1\" = ? AND \"pk2\" = ? AND \"pk3\" > ?)) ORDER BY \"pk1\", \"pk2\", \"pk3\" LIMIT 1024");
    }

    @Test
    public void testBuildQueryThreePkColumnsWithAdditionalCondition() {
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(config(), new JdbcConnection(config().getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        SignalBasedIncrementalSnapshotContext signalBasedIncrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext();
        signalBasedIncrementalSnapshotChangeEventSource.setContext(signalBasedIncrementalSnapshotContext);
        Column create = Column.editor().name("pk1").create();
        Column create2 = Column.editor().name("pk2").create();
        Column create3 = Column.editor().name("pk3").create();
        Table create4 = Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(create2).addColumn(create3).addColumn(Column.editor().name("val1").create()).addColumn(Column.editor().name("val2").create()).setPrimaryKeyNames(new String[]{"pk1", "pk2", "pk3"}).create();
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create4, Optional.of("\"val1\"=foo"))).isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE \"val1\"=foo ORDER BY \"pk1\", \"pk2\", \"pk3\" LIMIT 1024");
        signalBasedIncrementalSnapshotContext.nextChunkPosition(new Object[]{1, 5});
        signalBasedIncrementalSnapshotContext.maximumKey(new Object[]{10, 50});
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create4, Optional.of("\"val1\"=foo"))).isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE ((\"pk1\" > ?) OR (\"pk1\" = ? AND \"pk2\" > ?) OR (\"pk1\" = ? AND \"pk2\" = ? AND \"pk3\" > ?)) AND NOT ((\"pk1\" > ?) OR (\"pk1\" = ? AND \"pk2\" > ?) OR (\"pk1\" = ? AND \"pk2\" = ? AND \"pk3\" > ?)) AND \"val1\"=foo ORDER BY \"pk1\", \"pk2\", \"pk3\" LIMIT 1024");
    }

    @Test
    public void testBuildQueryTwoPkColumnsWithSurrogateKey() {
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(config(), new JdbcConnection(config().getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        SignalBasedIncrementalSnapshotContext signalBasedIncrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext();
        signalBasedIncrementalSnapshotChangeEventSource.setContext(signalBasedIncrementalSnapshotContext);
        Column create = Column.editor().name("pk1").create();
        Column create2 = Column.editor().name("pk2").create();
        Table create3 = Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(create2).addColumn(Column.editor().name("val1").create()).addColumn(Column.editor().name("val2").create()).setPrimaryKeyNames(new String[]{"pk1", "pk2"}).create();
        signalBasedIncrementalSnapshotContext.addDataCollectionNamesToSnapshot(List.of(create3.id().toString()), Optional.empty(), Optional.of("pk2"));
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(create3, Optional.empty())).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk2\" LIMIT 1024");
    }

    @Test
    public void testMaxQuery() {
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(config(), new JdbcConnection(config().getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        Column create = Column.editor().name("pk1").create();
        Column create2 = Column.editor().name("pk2").create();
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildMaxPrimaryKeyQuery(Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(create2).addColumn(Column.editor().name("val1").create()).addColumn(Column.editor().name("val2").create()).setPrimaryKeyNames(new String[]{"pk1", "pk2"}).create(), Optional.empty())).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk1\" DESC, \"pk2\" DESC LIMIT 1");
    }

    @Test
    public void testMaxQueryWithAdditionalCondition() {
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(config(), new JdbcConnection(config().getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        Column create = Column.editor().name("pk1").create();
        Column create2 = Column.editor().name("pk2").create();
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildMaxPrimaryKeyQuery(Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(create2).addColumn(Column.editor().name("val1").create()).addColumn(Column.editor().name("val2").create()).setPrimaryKeyNames(new String[]{"pk1", "pk2"}).create(), Optional.of("\"val1\"=foo"))).isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE \"val1\"=foo ORDER BY \"pk1\" DESC, \"pk2\" DESC LIMIT 1");
    }

    @Test
    public void testMaxQueryWithSurrogateKey() {
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(config(), new JdbcConnection(config().getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        SignalBasedIncrementalSnapshotContext signalBasedIncrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext();
        signalBasedIncrementalSnapshotChangeEventSource.setContext(signalBasedIncrementalSnapshotContext);
        Column create = Column.editor().name("pk1").create();
        Column create2 = Column.editor().name("pk2").create();
        Table create3 = Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(create2).addColumn(Column.editor().name("val1").create()).addColumn(Column.editor().name("val2").create()).setPrimaryKeyNames(new String[]{"pk1", "pk2"}).create();
        signalBasedIncrementalSnapshotContext.addDataCollectionNamesToSnapshot(List.of(create3.id().toString()), Optional.empty(), Optional.of("pk2"));
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildMaxPrimaryKeyQuery(create3, Optional.empty())).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk2\" DESC LIMIT 1");
    }

    @Test
    @FixFor({"DBZ-5727"})
    public void testBuildProjectionWithColumnIncludeList() {
        RelationalDatabaseConnectorConfig buildConfig = buildConfig(config().getJdbcConfig().edit().with(RelationalDatabaseConnectorConfig.COLUMN_INCLUDE_LIST, ".*\\.(pk1|pk2|val1|val2)$").build());
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(buildConfig, new JdbcConnection(buildConfig.getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        signalBasedIncrementalSnapshotChangeEventSource.setContext(new SignalBasedIncrementalSnapshotContext());
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(createTwoPrimaryKeysTable(), Optional.empty())).isEqualTo("SELECT \"pk1\", \"pk2\", \"val1\", \"val2\" FROM \"s1\".\"table1\" ORDER BY \"pk1\", \"pk2\" LIMIT 1024");
    }

    @Test
    @FixFor({"DBZ-5727"})
    public void testBuildProjectionWithColumnExcludeList() {
        RelationalDatabaseConnectorConfig buildConfig = buildConfig(config().getJdbcConfig().edit().with(RelationalDatabaseConnectorConfig.COLUMN_EXCLUDE_LIST, ".*\\.(pk2|val3)$").build());
        SignalBasedIncrementalSnapshotChangeEventSource signalBasedIncrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource(buildConfig, new JdbcConnection(buildConfig.getJdbcConfig(), jdbcConfiguration -> {
            return null;
        }, "\"", "\""), (EventDispatcher) null, (DatabaseSchema) null, (Clock) null, SnapshotProgressListener.NO_OP(), DataChangeEventListener.NO_OP());
        signalBasedIncrementalSnapshotChangeEventSource.setContext(new SignalBasedIncrementalSnapshotContext());
        Assertions.assertThat(signalBasedIncrementalSnapshotChangeEventSource.buildChunkQuery(createTwoPrimaryKeysTable(), Optional.empty())).isEqualTo("SELECT \"pk1\", \"val1\", \"val2\" FROM \"s1\".\"table1\" ORDER BY \"pk1\", \"pk2\" LIMIT 1024");
    }

    private Table createTwoPrimaryKeysTable() {
        Column create = Column.editor().name("pk1").create();
        Column create2 = Column.editor().name("pk2").create();
        Column create3 = Column.editor().name("val1").create();
        return Table.editor().tableId(new TableId((String) null, "s1", "table1")).addColumn(create).addColumn(create2).addColumn(create3).addColumn(Column.editor().name("val2").create()).addColumn(Column.editor().name("val3").create()).setPrimaryKeyNames(new String[]{"pk1", "pk2"}).create();
    }
}
