package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.fest.assertions.IntAssert;
import org.fest.assertions.MapAssert;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.class */
public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector> extends AbstractConnectorTest {
    private static final int ROW_COUNT = 1000;
    private static final int MAXIMUM_NO_RECORDS_CONSUMES = 2;
    protected static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-is.txt").toAbsolutePath();

    protected abstract Class<T> connectorClass();

    protected abstract JdbcConnection databaseConnection();

    protected abstract String topicName();

    protected abstract String tableName();

    protected abstract String signalTableName();

    protected abstract Configuration.Builder config();

    protected void populateTable(JdbcConnection jdbcConnection) throws SQLException {
        jdbcConnection.setAutoCommit(false);
        for (int i = 0; i < ROW_COUNT; i++) {
            jdbcConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", tableName(), Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
        jdbcConnection.commit();
    }

    protected void populateTable() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populateTable(databaseConnection);
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int i) throws InterruptedException {
        return consumeMixedWithIncrementalSnapshot(i, entry -> {
            return true;
        }, null);
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int i, Predicate<Map.Entry<Integer, Integer>> predicate, Consumer<List<SourceRecord>> consumer) throws InterruptedException {
        HashMap hashMap = new HashMap();
        int i2 = 0;
        while (true) {
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            List<SourceRecord> recordsForTopic = consumeRecordsByTopic.recordsForTopic(topicName());
            if (consumeRecordsByTopic.allRecordsInOrder().isEmpty()) {
                i2++;
                ((IntAssert) Assertions.assertThat(i2).describedAs("Too many no data record results")).isLessThanOrEqualTo(MAXIMUM_NO_RECORDS_CONSUMES);
            } else {
                i2 = 0;
                if (recordsForTopic != null && !recordsForTopic.isEmpty()) {
                    recordsForTopic.forEach(sourceRecord -> {
                        hashMap.put(Integer.valueOf(((Struct) sourceRecord.key()).getInt32(pkFieldName()).intValue()), Integer.valueOf(((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName()).intValue()));
                    });
                    if (consumer != null) {
                        consumer.accept(recordsForTopic);
                    }
                    if (hashMap.size() >= i && !hashMap.entrySet().stream().anyMatch(predicate.negate())) {
                        Assertions.assertThat(hashMap).hasSize(i);
                        return hashMap;
                    }
                }
            }
        }
    }

    protected String valueFieldName() {
        return "aa";
    }

    protected String pkFieldName() {
        return "pk";
    }

    protected void sendAdHocSnapshotSignal() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.execute(new String[]{String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')", signalTableName(), tableName())});
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> function) {
        start(connectorClass(), function.apply(config()).build());
        waitForConnectorToStart();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
    }

    protected void startConnector() {
        startConnector(Function.identity());
    }

    protected void waitForConnectorToStart() {
        assertConnectorIsRunning();
    }

    @Test
    public void snapshotOnly() throws Exception {
        Testing.Print.enable();
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(ROW_COUNT);
        for (int i = 0; i < ROW_COUNT; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).includes(new MapAssert.Entry[]{MapAssert.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    public void inserts() throws Exception {
        Testing.Print.enable();
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < ROW_COUNT; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", tableName(), Integer.valueOf(i + ROW_COUNT + 1), Integer.valueOf(i + ROW_COUNT))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000);
            for (int i2 = 0; i2 < 2000; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).includes(new MapAssert.Entry[]{MapAssert.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void updates() throws Exception {
        Testing.Print.enable();
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < ROW_COUNT; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", tableName(), Integer.valueOf(i * 10), Integer.valueOf((i + 1) * 10))});
                databaseConnection.commit();
            }
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(ROW_COUNT, entry -> {
                return ((Integer) entry.getValue()).intValue() >= 2000;
            }, null);
            for (int i2 = 0; i2 < ROW_COUNT; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).includes(new MapAssert.Entry[]{MapAssert.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2 + 2000))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void updatesWithRestart() throws Exception {
        Testing.Print.enable();
        populateTable();
        Configuration build = config().build();
        startAndConsumeTillEnd(connectorClass(), build);
        waitForConnectorToStart();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < ROW_COUNT; i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", tableName(), Integer.valueOf(i * 10), Integer.valueOf((i + 1) * 10))});
                databaseConnection.commit();
            }
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(ROW_COUNT, entry -> {
                return ((Integer) entry.getValue()).intValue() >= 2000;
            }, list -> {
                if (atomicInteger.addAndGet(list.size()) <= 50 || atomicBoolean.get()) {
                    return;
                }
                stopConnector();
                assertConnectorNotRunning();
                start(connectorClass(), build);
                waitForConnectorToStart();
                atomicBoolean.set(true);
            });
            for (int i2 = 0; i2 < ROW_COUNT; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).includes(new MapAssert.Entry[]{MapAssert.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2 + 2000))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void updatesLargeChunk() throws Exception {
        Testing.Print.enable();
        populateTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, ROW_COUNT);
        });
        sendAdHocSnapshotSignal();
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.execute(new String[]{String.format("UPDATE %s SET aa = aa + 2000", tableName())});
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(ROW_COUNT, entry -> {
                return ((Integer) entry.getValue()).intValue() >= 2000;
            }, null);
            for (int i = 0; i < ROW_COUNT; i++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).includes(new MapAssert.Entry[]{MapAssert.entry(Integer.valueOf(i + 1), Integer.valueOf(i + 2000))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void snapshotOnlyWithRestart() throws Exception {
        Testing.Print.enable();
        populateTable();
        Configuration build = config().build();
        startAndConsumeTillEnd(connectorClass(), build);
        waitForConnectorToStart();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
        sendAdHocSnapshotSignal();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Map<Integer, Integer> consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(ROW_COUNT, entry -> {
            return true;
        }, list -> {
            if (atomicInteger.addAndGet(list.size()) <= 50 || atomicBoolean.get()) {
                return;
            }
            stopConnector();
            assertConnectorNotRunning();
            start(connectorClass(), build);
            waitForConnectorToStart();
            atomicBoolean.set(true);
        });
        for (int i = 0; i < ROW_COUNT; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).includes(new MapAssert.Entry[]{MapAssert.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Override // io.debezium.embedded.AbstractConnectorTest
    protected int getMaximumEnqueuedRecordCount() {
        return 3000;
    }
}
