package io.debezium.connector.dse;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.ChangeRecord;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.OffsetPosition;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.SnapshotProcessor;
import io.debezium.connector.cassandra.TestUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/connector/dse/SnapshotProcessorTest.class */
public class SnapshotProcessorTest extends DseConnectorTestBase {
    @Test
    public void testSnapshotTable() throws Exception {
        CassandraConnectorContext generateTaskContext = generateTaskContext();
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(generateTaskContext, new DseTypeProvider().getClusterName()));
        Mockito.when(Boolean.valueOf(snapshotProcessor.isRunning())).thenReturn(true);
        generateTaskContext.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        generateTaskContext.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable("cdc_table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        for (int i = 0; i < 5; i++) {
            generateTaskContext.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{Integer.valueOf(i), String.valueOf(i)});
            generateTaskContext.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable("cdc_table2") + "(a, b) VALUES (?, ?)", new Object[]{Integer.valueOf(i + 10), String.valueOf(i + 10)});
        }
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) generateTaskContext.getQueues().get(0);
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        snapshotProcessor.process();
        Assert.assertEquals(2 * 5, changeEventQueue.totalCapacity() - changeEventQueue.remainingCapacity());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (ChangeRecord changeRecord : changeEventQueue.poll()) {
            Assert.assertEquals(changeRecord.getEventType(), Event.EventType.CHANGE_EVENT);
            Assert.assertEquals(changeRecord.getOp(), Record.Operation.INSERT);
            Assert.assertEquals(changeRecord.getSource().cluster, DatabaseDescriptor.getClusterName());
            Assert.assertTrue(changeRecord.getSource().snapshot);
            if (changeRecord.getSource().keyspaceTable.name().equals(TestUtils.keyspaceTable("cdc_table"))) {
                arrayList.add(changeRecord);
            } else {
                arrayList2.add(changeRecord);
            }
            Assert.assertEquals(changeRecord.getSource().offsetPosition, OffsetPosition.defaultOffsetPosition());
        }
        Assert.assertEquals(5, arrayList.size());
        Assert.assertEquals(5, arrayList2.size());
        TestUtils.deleteTestKeyspaceTables();
        TestUtils.deleteTestOffsets(generateTaskContext);
        generateTaskContext.cleanUp();
    }

    @Test
    public void testSnapshotSkipsNonCdcEnabledTable() throws Exception {
        CassandraConnectorContext generateTaskContext = generateTaskContext();
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(generateTaskContext, new DseTypeProvider().getClusterName()));
        Mockito.when(Boolean.valueOf(snapshotProcessor.isRunning())).thenReturn(true);
        generateTaskContext.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable("non_cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;");
        for (int i = 0; i < 5; i++) {
            generateTaskContext.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable("non_cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{Integer.valueOf(i), String.valueOf(i)});
        }
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) generateTaskContext.getQueues().get(0);
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        snapshotProcessor.process();
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        TestUtils.deleteTestKeyspaceTables();
        TestUtils.deleteTestOffsets(generateTaskContext);
        generateTaskContext.cleanUp();
    }

    @Test
    public void testSnapshotEmptyTable() throws Exception {
        CassandraConnectorContext generateTaskContext = generateTaskContext();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(generateTaskContext, new DseTypeProvider().getClusterName()));
        Mockito.when(Boolean.valueOf(snapshotProcessor.isRunning())).thenReturn(true);
        generateTaskContext.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) generateTaskContext.getQueues().get(0);
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        snapshotProcessor.process();
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        for (int i = 0; i < 5; i++) {
            generateTaskContext.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{Integer.valueOf(i), String.valueOf(i)});
        }
        snapshotProcessor.process();
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        TestUtils.deleteTestKeyspaceTables();
        TestUtils.deleteTestOffsets(generateTaskContext);
        atomicBoolean.set(false);
        generateTaskContext.cleanUp();
    }

    @Test
    public void testSnapshotModeAlways() throws Exception {
        HashMap propertiesForContext = TestUtils.propertiesForContext();
        propertiesForContext.put("kafka.producer.bootstrap.servers", "localhost:9092");
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "always");
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
        CassandraConnectorContext generateTaskContext = generateTaskContext(propertiesForContext);
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(generateTaskContext, new DseTypeProvider().getClusterName()));
        ((SnapshotProcessor) Mockito.doNothing().when(snapshotProcessor)).snapshot();
        for (int i = 0; i < 5; i++) {
            snapshotProcessor.process();
        }
        ((SnapshotProcessor) Mockito.verify(snapshotProcessor, Mockito.times(5))).snapshot();
        generateTaskContext.cleanUp();
    }

    @Test
    public void testSnapshotModeInitial() throws Exception {
        HashMap propertiesForContext = TestUtils.propertiesForContext();
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "initial");
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
        CassandraConnectorContext generateTaskContext = generateTaskContext(propertiesForContext);
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(generateTaskContext, new DseTypeProvider().getClusterName()));
        ((SnapshotProcessor) Mockito.doNothing().when(snapshotProcessor)).snapshot();
        for (int i = 0; i < 5; i++) {
            snapshotProcessor.process();
        }
        ((SnapshotProcessor) Mockito.verify(snapshotProcessor, Mockito.times(1))).snapshot();
        generateTaskContext.cleanUp();
    }

    @Test
    public void testSnapshotModeNever() throws Exception {
        HashMap propertiesForContext = TestUtils.propertiesForContext();
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "never");
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
        CassandraConnectorContext generateTaskContext = generateTaskContext(propertiesForContext);
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(generateTaskContext, new DseTypeProvider().getClusterName()));
        ((SnapshotProcessor) Mockito.doNothing().when(snapshotProcessor)).snapshot();
        for (int i = 0; i < 5; i++) {
            snapshotProcessor.process();
        }
        ((SnapshotProcessor) Mockito.verify(snapshotProcessor, Mockito.never())).snapshot();
        generateTaskContext.cleanUp();
    }
}
