package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenConnectorUnderTest;
import io.debezium.junit.SkipWhenConnectorsUnderTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.class */
public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector> extends AbstractSnapshotTest<T> {
    protected static KafkaCluster kafka;

    protected String getSignalTypeFieldName() {
        return "type";
    }

    protected abstract String noPKTopicName();

    protected abstract String noPKTableName();

    protected String noPKTableDataCollectionId() {
        return noPKTableName();
    }

    protected String returnedIdentifierName(String str) {
        return str;
    }

    protected void sendAdHocSnapshotStopSignal(String... strArr) throws SQLException {
        String str = strArr.length > 0 ? ",\"data-collections\": [" + ((String) Arrays.stream(strArr).map(str2 -> {
            return "\"" + str2 + "\"";
        }).collect(Collectors.joining(", "))) + "]" : "";
        try {
            JdbcConnection databaseConnection = databaseConnection();
            try {
                String format = String.format("INSERT INTO %s VALUES('ad-hoc', 'stop-snapshot', '{\"type\": \"INCREMENTAL\"" + str + "}')", signalTableName());
                this.logger.info("Sending signal with query {}", format);
                databaseConnection.execute(new String[]{format});
                if (databaseConnection != null) {
                    databaseConnection.close();
                }
            } finally {
            }
        } catch (Exception e) {
            this.logger.warn("Failed to send signal", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAdHocSnapshotSignal() throws SQLException {
        sendAdHocSnapshotSignal(tableDataCollectionId());
    }

    protected void sendAdHocKafkaSnapshotSignal() throws ExecutionException, InterruptedException {
        sendExecuteSnapshotKafkaSignal(tableDataCollectionId());
    }

    protected void sendExecuteSnapshotKafkaSignal(String str) throws ExecutionException, InterruptedException {
        sendKafkaSignal(String.format("{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", str));
    }

    protected String getSignalsTopic() {
        return "signals_topic";
    }

    protected void sendKafkaSignal(String str) throws ExecutionException, InterruptedException {
        ProducerRecord producerRecord = new ProducerRecord(getSignalsTopic(), 0, "test_server", str);
        KafkaProducer kafkaProducer = new KafkaProducer(Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "signals").withDefault("key.serializer", StringSerializer.class).withDefault("value.serializer", StringSerializer.class).build().asProperties());
        try {
            kafkaProducer.send(producerRecord).get();
            kafkaProducer.close();
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    protected void sendPauseSignal() {
        try {
            JdbcConnection databaseConnection = databaseConnection();
            try {
                String format = String.format("INSERT INTO %s VALUES('test-pause', 'pause-snapshot', '')", signalTableName());
                this.logger.info("Sending pause signal with query {}", format);
                databaseConnection.execute(new String[]{format});
                if (databaseConnection != null) {
                    databaseConnection.close();
                }
            } finally {
            }
        } catch (Exception e) {
            this.logger.warn("Failed to send pause signal", e);
        }
    }

    protected void sendResumeSignal() {
        try {
            JdbcConnection databaseConnection = databaseConnection();
            try {
                String format = String.format("INSERT INTO %s VALUES('test-resume', 'resume-snapshot', '')", signalTableName());
                this.logger.info("Sending resume signal with query {}", format);
                databaseConnection.execute(new String[]{format});
                if (databaseConnection != null) {
                    databaseConnection.close();
                }
            } finally {
            }
        } catch (Exception e) {
            this.logger.warn("Failed to send resume signal", e);
        }
    }

    @Test
    public void snapshotOnly() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    public void invalidTablesInTheList() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal("invalid1", tableDataCollectionId(), "invalid2");
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    public void inserts() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000);
            for (int i2 = 0; i2 < 2000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void insertsWithKafkaSnapshotSignal() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000);
            for (int i2 = 0; i2 < 2000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void insertsWithoutPks() throws Exception {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populate4PkTable(databaseConnection, noPKTableName());
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            startConnector();
            sendAdHocSnapshotSignal(noPKTableDataCollectionId());
            Map<Integer, V> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
                return true;
            }, struct -> {
                return Integer.valueOf((struct.getInt32(returnedIdentifierName("pk1")).intValue() * 1000) + (struct.getInt32(returnedIdentifierName("pk2")).intValue() * 100) + (struct.getInt32(returnedIdentifierName("pk3")).intValue() * 10) + struct.getInt32(returnedIdentifierName("pk4")).intValue());
            }, sourceRecord -> {
                return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
            }, noPKTopicName(), null);
            for (int i = 0; i < 1000; i++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void insertsWithoutPksAndNull() throws Exception {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            int i = 10;
            while (i <= 30) {
                String valueOf = i == 10 ? "NULL" : String.valueOf(i);
                int i2 = 1;
                while (i2 <= 3) {
                    databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, 0, 0, %s)", noPKTableName(), valueOf, i2 == 1 ? "NULL" : String.valueOf(i2), Integer.valueOf(i + i2))});
                    i2++;
                }
                i += 10;
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            startConnector(builder -> {
                return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1);
            });
            sendAdHocSnapshotSignal(noPKTableDataCollectionId());
            Map<Integer, V> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(9, entry -> {
                return true;
            }, struct -> {
                return Integer.valueOf(((Integer) Objects.requireNonNullElse(struct.getInt32(returnedIdentifierName("pk1")), 10)).intValue() + ((Integer) Objects.requireNonNullElse(struct.getInt32(returnedIdentifierName("pk2")), 1)).intValue());
            }, sourceRecord -> {
                return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
            }, noPKTopicName(), null);
            for (int i3 = 10; i3 <= 30; i3 += 10) {
                for (int i4 = 1; i4 <= 3; i4++) {
                    Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i3 + i4), Integer.valueOf(i3 + i4))});
                }
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void updates() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET aa = aa + 2000 WHERE %s > %s AND %s <= %s", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i * 10), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf((i + 1) * 10))});
                databaseConnection.commit();
            }
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
                return ((Integer) entry.getValue()).intValue() >= 2000;
            }, null);
            for (int i2 = 0; i2 < 1000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2 + 2000))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void updatesWithRestart() throws Exception {
        populateTable();
        Configuration build = config().build();
        startAndConsumeTillEnd(connectorClass(), build);
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET aa = aa + 2000 WHERE %s > %s AND %s <= %s", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i * 10), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf((i + 1) * 10))});
                databaseConnection.commit();
            }
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
                return ((Integer) entry.getValue()).intValue() >= 2000;
            }, list -> {
                if (atomicInteger.addAndGet(list.size()) <= 50 || atomicBoolean.get()) {
                    return;
                }
                stopConnector();
                assertConnectorNotRunning();
                start(connectorClass(), build);
                waitForConnectorToStart();
                atomicBoolean.set(true);
            });
            for (int i2 = 0; i2 < 1000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2 + 2000))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void updatesLargeChunk() throws Exception {
        populateTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1000);
        });
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.execute(new String[]{String.format("UPDATE %s SET aa = aa + 2000", tableName())});
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
                return ((Integer) entry.getValue()).intValue() >= 2000;
            }, null);
            for (int i = 0; i < 1000; i++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i + 2000))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void snapshotOnlyWithRestart() throws Exception {
        populateTable();
        Configuration build = config().build();
        startAndConsumeTillEnd(connectorClass(), build);
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignal();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
            return true;
        }, list -> {
            if (atomicInteger.addAndGet(list.size()) <= 50 || atomicBoolean.get()) {
                return;
            }
            stopConnector();
            assertConnectorNotRunning();
            start(connectorClass(), build);
            waitForConnectorToStart();
            atomicBoolean.set(true);
        });
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    @FixFor({"DBZ-7716"})
    public void whenSnapshotMultipleTablesAndConnectorRestartsThenOnlyNotAlreadyProcessedTableMustBeProcessed() throws Exception {
        populateTables();
        Configuration build = config().with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 200).build();
        startAndConsumeTillEnd(connectorClass(), build);
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignal((String[]) tableDataCollectionIds().toArray(new String[0]));
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        ArrayList arrayList = new ArrayList();
        consumeRecordsUntil((num, sourceRecord) -> {
            return atomicInteger.get() == 2000;
        }, (num2, sourceRecord2) -> {
            return "";
        }, 5, sourceRecord3 -> {
            Testing.print("Record counter " + atomicInteger.get());
            if (topicNames().contains(sourceRecord3.topic())) {
                arrayList.add(sourceRecord3);
                if (sourceRecord3.topic().contains(topicName()) || atomicInteger.addAndGet(1) <= 150 || atomicBoolean.get()) {
                    return;
                }
                stopConnector();
                assertConnectorNotRunning();
                start(connectorClass(), build);
                waitForConnectorToStart();
                atomicBoolean.set(true);
            }
        }, false);
        Map map = (Map) arrayList.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.topic();
        }, Collectors.mapping(Function.identity(), Collectors.toList())));
        Map map2 = (Map) ((List) map.get(topicNames().get(0))).stream().collect(Collectors.toMap(sourceRecord4 -> {
            return ((Struct) sourceRecord4.key()).getInt32(pkFieldName());
        }, sourceRecord5 -> {
            return ((Struct) sourceRecord5.value()).getStruct("after").getInt32(valueFieldName());
        }));
        Map map3 = (Map) ((List) map.get(topicNames().get(1))).stream().collect(Collectors.toMap(sourceRecord6 -> {
            return ((Struct) sourceRecord6.key()).getInt32(pkFieldName());
        }, sourceRecord7 -> {
            return ((Struct) sourceRecord7.value()).getStruct("after").getInt32(valueFieldName());
        }));
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(map2).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
            Assertions.assertThat(map3).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    @FixFor({"DBZ-4272"})
    @SkipWhenConnectorsUnderTest({@SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.SQL_SERVER), @SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.DB2)})
    public void snapshotPreceededBySchemaChange() throws Exception {
        populateTable();
        startConnector();
        waitForConnectorToStart();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.execute(new String[]{alterTableAddColumnStatement(tableName())});
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            sendAdHocSnapshotSignal();
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000);
            for (int i = 0; i < 1000; i++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
            }
            databaseConnection = databaseConnection();
            try {
                databaseConnection.execute(new String[]{alterTableDropColumnStatement(tableName())});
                if (databaseConnection != null) {
                    databaseConnection.close();
                }
                sendAdHocSnapshotSignal();
                Map<Integer, Integer> consumeMixedWithIncrementalSnapshot2 = consumeMixedWithIncrementalSnapshot(1000);
                for (int i2 = 0; i2 < 1000; i2++) {
                    Assertions.assertThat(consumeMixedWithIncrementalSnapshot2).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void snapshotWithRegexDataCollections() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal(".*");
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    @FixFor({"DBZ-6945"})
    public void snapshotWithDuplicateDataCollections() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal(tableDataCollectionId(), tableDataCollectionId());
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
        Assert.assertTrue(Objects.isNull(consumeRecordsByTopic(1, 1).recordsForTopic(topicName())));
    }

    @Test
    @FixFor({"DBZ-4271"})
    public void stopCurrentIncrementalSnapshotWithoutCollectionsAndTakeNewNewIncrementalSnapshotAfterRestart() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        populateTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1);
        });
        sendAdHocSnapshotSignalAndWait(new String[0]);
        sendAdHocSnapshotStopSignalAndWait(new String[0]);
        Assertions.assertThat(consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMessage(logInterceptor, "Stopping incremental snapshot")).isTrue();
        stopConnector(z -> {
            logInterceptor.clear();
        });
        startConnector();
        Assertions.assertThat(logInterceptor.containsMessage("No incremental snapshot in progress")).isTrue();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000);
            for (int i2 = 0; i2 < 2000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4271"})
    public void stopCurrentIncrementalSnapshotWithAllCollectionsAndTakeNewNewIncrementalSnapshotAfterRestart() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        populateTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1);
        });
        sendAdHocSnapshotSignalAndWait(new String[0]);
        sendAdHocSnapshotStopSignalAndWait(tableDataCollectionId());
        Assertions.assertThat(consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMessage(logInterceptor, "Removing '[" + tableDataCollectionId() + "]' collections from incremental snapshot")).isTrue();
        stopConnector(z -> {
            logInterceptor.clear();
        });
        startConnector();
        Assertions.assertThat(logInterceptor.containsMessage("No incremental snapshot in progress")).isTrue();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000);
            for (int i2 = 0; i2 < 2000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4271"})
    public void removeNotYetCapturedCollectionFromInProgressIncrementalSnapshot() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        populateTables();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250);
        });
        List<String> tableDataCollectionIds = tableDataCollectionIds();
        Assertions.assertThat(tableDataCollectionIds).hasSize(2);
        List<String> tableNames = tableNames();
        Assertions.assertThat(tableNames).hasSize(2);
        List<String> list = topicNames();
        Assertions.assertThat(list).hasSize(2);
        String str = tableDataCollectionIds.get(1);
        String str2 = tableNames.get(0);
        String str3 = list.get(0);
        sendAdHocSnapshotSignal((String[]) tableDataCollectionIds.toArray(new String[0]));
        sendAdHocSnapshotStopSignal(str);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(logInterceptor.containsMessage("Removing '[" + str + "]' collections from incremental snapshot"));
        });
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", str2, databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000, str3);
            for (int i2 = 0; i2 < 2000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4271"})
    public void removeStartedCapturedCollectionFromInProgressIncrementalSnapshot() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        populateTables();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250);
        });
        List<String> tableDataCollectionIds = tableDataCollectionIds();
        Assertions.assertThat(tableDataCollectionIds).hasSize(2);
        List<String> tableNames = tableNames();
        Assertions.assertThat(tableNames).hasSize(2);
        List<String> list = topicNames();
        Assertions.assertThat(list).hasSize(2);
        String str = tableDataCollectionIds.get(0);
        String str2 = tableNames.get(1);
        String str3 = list.get(1);
        sendAdHocSnapshotSignal((String[]) tableDataCollectionIds.toArray(new String[0]));
        sendAdHocSnapshotStopSignal(str);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(logInterceptor.containsMessage("Removing '[" + str + "]' collections from incremental snapshot"));
        });
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", str2, databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000, str3);
            for (int i2 = 0; i2 < 2000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4834"})
    public void shouldSnapshotNewlyAddedTableToIncludeListAfterRestart() throws Exception {
        populateTables();
        startConnectorWithSnapshot(builder -> {
            return mutableConfig(true, false);
        });
        waitForConnectorToStart();
        consumeRecordsByTopic(1000);
        stopConnector();
        startConnector(builder2 -> {
            return mutableConfig(false, false);
        });
        waitForConnectorToStart();
        sendAdHocSnapshotSignal();
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
        stopConnector();
    }

    @Test
    public void testPauseDuringSnapshot() throws Exception {
        populateTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 50);
        });
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignal();
        ArrayList arrayList = new ArrayList();
        String str = topicName();
        consumeRecords(100, sourceRecord -> {
            if (str.equalsIgnoreCase(sourceRecord.topic())) {
                arrayList.add(sourceRecord);
            }
        });
        sendPauseSignal();
        consumeAvailableRecords(sourceRecord2 -> {
            if (str.equalsIgnoreCase(sourceRecord2.topic())) {
                arrayList.add(sourceRecord2);
            }
        });
        int size = arrayList.size();
        sendResumeSignal();
        if (1000 - size > 0) {
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000 - size);
            for (int i = size + 1; i < 1000; i++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
            }
        }
    }

    @Test
    public void snapshotWithAdditionalCondition() throws Exception {
        int i = 12345678;
        populateTable();
        populateTableWithSpecificValue(2000, 10, 12345678);
        waitForCdcTransactionPropagation(3);
        startAndConsumeTillEnd(connectorClass(), config().build());
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", 12345678), "", tableDataCollectionId());
        Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot = consumeRecordsMixedWithIncrementalSnapshot(10, entry -> {
            return true;
        }, null);
        Assert.assertEquals(10, consumeRecordsMixedWithIncrementalSnapshot.size());
        Assert.assertTrue(consumeRecordsMixedWithIncrementalSnapshot.values().stream().allMatch(sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName()).equals(Integer.valueOf(i));
        }));
    }

    @Test
    public void snapshotWithNewAdditionalConditionsField() throws Exception {
        int i = 12345678;
        populateTable();
        populateTableWithSpecificValue(2000, 10, 12345678);
        waitForCdcTransactionPropagation(3);
        startAndConsumeTillEnd(connectorClass(), config().build());
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(tableDataCollectionId(), String.format("aa = %s", 12345678)), "", tableDataCollectionId());
        Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot = consumeRecordsMixedWithIncrementalSnapshot(10, entry -> {
            return true;
        }, null);
        Assert.assertEquals(10, consumeRecordsMixedWithIncrementalSnapshot.size());
        Assert.assertTrue(consumeRecordsMixedWithIncrementalSnapshot.values().stream().allMatch(sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName()).equals(Integer.valueOf(i));
        }));
    }

    @Test
    public void shouldExecuteRegularSnapshotWhenAdditionalConditionEmpty() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("\"\"", "", tableDataCollectionId());
        Assert.assertEquals(1000L, consumeRecordsMixedWithIncrementalSnapshot(1000, entry -> {
            return true;
        }, null).size());
    }

    @Test
    public void snapshotWithAdditionalConditionWithRestart() throws Exception {
        int i = 12345678;
        populateTable();
        populateTableWithSpecificValue(2000, 1000, 12345678);
        waitForCdcTransactionPropagation(3);
        Configuration build = config().build();
        startAndConsumeTillEnd(connectorClass(), build);
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", 12345678), "", tableDataCollectionId());
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
            return true;
        }, list -> {
            if (atomicInteger.addAndGet(list.size()) <= 50 || atomicBoolean.get()) {
                return;
            }
            stopConnector();
            assertConnectorNotRunning();
            start(connectorClass(), build);
            waitForConnectorToStart();
            atomicBoolean.set(true);
        });
        Assert.assertEquals(1000, consumeMixedWithIncrementalSnapshot.size());
        Assert.assertTrue(consumeMixedWithIncrementalSnapshot.values().stream().allMatch(num -> {
            return num.equals(Integer.valueOf(i));
        }));
    }

    @Test
    public void snapshotWithSurrogateKey() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "\"aa\"", tableDataCollectionId());
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception {
        int i = 12345678;
        populateTable();
        populateTableWithSpecificValue(2000, 10, 12345678);
        waitForCdcTransactionPropagation(3);
        startAndConsumeTillEnd(connectorClass(), config().build());
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", 12345678), "\"aa\"", tableDataCollectionId());
        Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot = consumeRecordsMixedWithIncrementalSnapshot(10, entry -> {
            return true;
        }, null);
        Assert.assertEquals(10, consumeRecordsMixedWithIncrementalSnapshot.size());
        Assert.assertTrue(consumeRecordsMixedWithIncrementalSnapshot.values().stream().allMatch(sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName()).equals(Integer.valueOf(i));
        }));
    }

    @Test
    public void testNotification() throws Exception {
        populateTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink").with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, defaultIncrementalSnapshotChunkSize()).with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification");
        }, loggingCompletion(), false);
        waitForConnectorToStart();
        waitForAvailableRecords(1L, TimeUnit.SECONDS);
        waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
        sendAdHocSnapshotSignal();
        ArrayList arrayList = new ArrayList();
        String str = topicName();
        ArrayList arrayList2 = new ArrayList();
        consumeRecords(100, sourceRecord -> {
            if (str.equalsIgnoreCase(sourceRecord.topic())) {
                arrayList.add(sourceRecord);
            }
            if ("io.debezium.notification".equals(sourceRecord.topic())) {
                arrayList2.add(sourceRecord);
            }
        });
        sendPauseSignal();
        consumeAvailableRecords(sourceRecord2 -> {
            if (str.equalsIgnoreCase(sourceRecord2.topic())) {
                arrayList.add(sourceRecord2);
            }
            if ("io.debezium.notification".equals(sourceRecord2.topic())) {
                arrayList2.add(sourceRecord2);
            }
        });
        sendResumeSignal();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopicUntil = consumeRecordsByTopicUntil(incrementalSnapshotCompleted());
        arrayList.addAll(consumeRecordsByTopicUntil.recordsForTopic(topicName()));
        arrayList2.addAll(consumeRecordsByTopicUntil.recordsForTopic("io.debezium.notification"));
        List list = (List) arrayList.stream().map(sourceRecord3 -> {
            return (Struct) sourceRecord3.value();
        }).map(getRecordValue()).collect(Collectors.toList());
        for (int i = 0; i < 999; i++) {
            Assertions.assertThat((Integer) list.get(i)).isEqualTo(i);
        }
        assertCorrectIncrementalSnapshotNotification(arrayList2);
    }

    @Test
    public void insertInsertWatermarkingStrategy() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000);
            for (int i2 = 0; i2 < 2000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
            }
            assertOpenCloseEventCount(resultSet -> {
                resultSet.next();
                Assertions.assertThat(resultSet.getInt(1)).isNotZero();
            });
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void insertDeleteWatermarkingStrategy() throws Exception {
        populateTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY, "insert_delete").with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false);
        });
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i + 1000 + 1), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000);
            for (int i2 = 0; i2 < 2000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
            }
            assertOpenCloseEventCount(resultSet -> {
                resultSet.next();
                Assertions.assertThat(resultSet.getInt(1)).isZero();
            });
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void assertOpenCloseEventCount(JdbcConnection.ResultSetConsumer resultSetConsumer) throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.query("SELECT count(id) from " + signalTableName() + " where type='snapshot-window-close'", resultSetConsumer);
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected int defaultIncrementalSnapshotChunkSize() {
        return 1;
    }

    private static BiPredicate<Integer, SourceRecord> incrementalSnapshotCompleted() {
        return (num, sourceRecord) -> {
            return sourceRecord.topic().equals("io.debezium.notification") && ((Struct) sourceRecord.value()).getString("aggregate_type").equals("Incremental Snapshot") && ((Struct) sourceRecord.value()).getString("type").equals("COMPLETED");
        };
    }

    private void assertCorrectIncrementalSnapshotNotification(List<SourceRecord> list) {
        List list2 = (List) list.stream().map(sourceRecord -> {
            return (Struct) sourceRecord.value();
        }).filter(struct -> {
            return struct.getString("aggregate_type").equals("Incremental Snapshot");
        }).collect(Collectors.toList());
        Assertions.assertThat(list2.stream().anyMatch(struct2 -> {
            return struct2.getString("type").equals("STARTED");
        })).isTrue();
        Assertions.assertThat(list2.stream().anyMatch(struct3 -> {
            return struct3.getString("type").equals("PAUSED");
        })).isTrue();
        Assertions.assertThat(list2.stream().anyMatch(struct4 -> {
            return struct4.getString("type").equals("RESUMED");
        })).isTrue();
        Assertions.assertThat(list2.stream().anyMatch(struct5 -> {
            return struct5.getString("type").equals("IN_PROGRESS");
        })).isTrue();
        Assertions.assertThat(list2.stream().anyMatch(struct6 -> {
            return struct6.getString("type").equals("TABLE_SCAN_COMPLETED");
        })).isTrue();
        Assertions.assertThat(list2.stream().anyMatch(struct7 -> {
            return struct7.getString("type").equals("COMPLETED");
        })).isTrue();
        Assertions.assertThat((List) list2.stream().map(struct8 -> {
            return struct8.getString("id");
        }).distinct().collect(Collectors.toList())).contains(new String[]{"ad-hoc"});
        Assertions.assertThat(((Struct) list2.stream().filter(struct9 -> {
            return struct9.getString("type").equals("IN_PROGRESS");
        }).findFirst().get()).getMap("additional_data")).containsEntry("current_collection_in_progress", tableDataCollectionId()).containsEntry("maximum_key", "1000").containsEntry("last_processed_key", String.valueOf(defaultIncrementalSnapshotChunkSize()));
        Assertions.assertThat(((Struct) list2.stream().filter(struct10 -> {
            return struct10.getString("type").equals("TABLE_SCAN_COMPLETED");
        }).findFirst().get()).getMap("additional_data")).containsEntry("total_rows_scanned", "1000");
    }

    protected void sendAdHocSnapshotSignalAndWait(String... strArr) throws Exception {
        if (strArr.length == 0) {
            sendAdHocSnapshotSignal();
        } else {
            sendAdHocSnapshotSignal(strArr);
        }
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            consumeAvailableRecords(sourceRecord -> {
                if (sourceRecord.topic().endsWith(signalTableNameSanitized())) {
                    atomicBoolean.set(true);
                }
            });
            return Boolean.valueOf(atomicBoolean.get());
        });
    }

    protected void sendAdHocSnapshotStopSignalAndWait(String... strArr) throws Exception {
        sendAdHocSnapshotStopSignal(strArr);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            consumeAvailableRecords(sourceRecord -> {
                Struct struct;
                if (sourceRecord.topic().endsWith(signalTableNameSanitized()) && (struct = ((Struct) sourceRecord.value()).getStruct("after")) != null && "stop-snapshot".equals(struct.getString(getSignalTypeFieldName()))) {
                    atomicBoolean.set(true);
                }
            });
            return Boolean.valueOf(atomicBoolean.get());
        });
    }

    protected boolean consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMessage(LogInterceptor logInterceptor, String str) throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
            if (logInterceptor.containsMessage(str)) {
                atomicBoolean.set(true);
            }
            return Boolean.valueOf(consumeAvailableRecords(sourceRecord -> {
            }) == 0);
        });
        return atomicBoolean.get();
    }
}
