package io.debezium.connector.vitess;

import io.debezium.config.Configuration;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.embedded.KafkaConnectUtil;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.Callback;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/vitess/VitessConnectorTaskTest.class */
public class VitessConnectorTaskTest {
    private static final LogInterceptor logInterceptor = new LogInterceptor(BaseSourceTask.class);
    private static final LogInterceptor vitessLogInterceptor = new LogInterceptor(VitessConnectorTask.class);

    /* loaded from: input_file:io/debezium/connector/vitess/VitessConnectorTaskTest$ContextHelper.class */
    static class ContextHelper {
        String engineName = "testOffset";
        OffsetBackingStore offsetStore = KafkaConnectUtil.memoryOffsetBackingStore();
        SourceTaskContext sourceTaskContext = initSourceTaskContext();

        public SourceTaskContext getSourceTaskContext() {
            return this.sourceTaskContext;
        }

        private SourceTaskContext initSourceTaskContext() {
            this.offsetStore.start();
            JsonConverter jsonConverter = new JsonConverter();
            Map hashMapOf = Collect.hashMapOf("schemas.enable", false);
            jsonConverter.configure(hashMapOf, true);
            JsonConverter jsonConverter2 = new JsonConverter();
            jsonConverter2.configure(hashMapOf, false);
            final OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl(this.offsetStore, this.engineName, jsonConverter, jsonConverter2);
            return new SourceTaskContext() { // from class: io.debezium.connector.vitess.VitessConnectorTaskTest.ContextHelper.1
                public Map<String, String> configs() {
                    return Collections.emptyMap();
                }

                public OffsetStorageReader offsetStorageReader() {
                    return offsetStorageReaderImpl;
                }
            };
        }

        public void storeOffsets(String str, Map<String, Object> map) {
            if (str == null && (map == null || map.isEmpty())) {
                Testing.print("Empty gtids to store to offset.");
                return;
            }
            JsonConverter jsonConverter = new JsonConverter();
            Map hashMapOf = Collect.hashMapOf("schemas.enable", false);
            jsonConverter.configure(hashMapOf, true);
            JsonConverter jsonConverter2 = new JsonConverter();
            jsonConverter2.configure(hashMapOf, false);
            OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(this.offsetStore, this.engineName, jsonConverter, jsonConverter2);
            if (str != null) {
                Testing.print(String.format("Server vgtids: %s", str));
                offsetStorageWriter.offset(Collect.hashMapOf("server", "test_server"), Collect.hashMapOf("vgtid", str));
            }
            if (map != null) {
                Testing.print(String.format("Previous vgtids: %s", map));
                for (String str2 : map.keySet()) {
                    offsetStorageWriter.offset(Collect.hashMapOf("server", "test_server", "task_key", str2), Collect.hashMapOf("vgtid", map.get(str2)));
                }
            }
            offsetStorageWriter.beginFlush();
            try {
                offsetStorageWriter.doFlush((Callback) null).get(100L, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                Assert.fail(e.getMessage());
            }
        }
    }

    @Test
    public void shouldStartWithTaskOffsetStorageEnabledAndNoOffsets() {
        Configuration build = TestHelper.defaultConfig(false, true, 1, 0, 1, null, VitessConnectorConfig.SnapshotMode.NEVER).with("vitess.task.key", "task0_0_1").with("vitess.total.tasks", 1).with("vitess.task.shards", TestHelper.TEST_SHARD).build();
        VitessConnectorTask vitessConnectorTask = new VitessConnectorTask();
        vitessConnectorTask.initialize(new ContextHelper().getSourceTaskContext());
        vitessConnectorTask.start(build);
        Assertions.assertThat(vitessLogInterceptor.containsMessage("Using offsets from config")).isTrue();
    }

    @Test
    public void shouldStartWithTaskOffsetStorageDisabledAndNoOffsets() {
        Configuration build = TestHelper.defaultConfig().with("vitess.total.tasks", 1).with("vitess.task.shards", TestHelper.TEST_SHARD).build();
        VitessConnectorTask vitessConnectorTask = new VitessConnectorTask();
        vitessConnectorTask.initialize(new ContextHelper().getSourceTaskContext());
        vitessConnectorTask.start(build);
        Assertions.assertThat(vitessLogInterceptor.containsMessage("No previous offset found")).isTrue();
    }

    @Test
    public void shouldReadOffsetsWhenTaskOffsetStorageDisabled() {
        Configuration build = TestHelper.defaultConfig().with("vitess.total.tasks", 1).with("vitess.task.shards", TestHelper.TEST_SHARD).build();
        VitessConnectorTask vitessConnectorTask = new VitessConnectorTask();
        ContextHelper contextHelper = new ContextHelper();
        contextHelper.storeOffsets(VgtidTest.VGTID_JSON, null);
        vitessConnectorTask.initialize(contextHelper.getSourceTaskContext());
        vitessConnectorTask.start(build);
        Assertions.assertThat(logInterceptor.containsMessage(String.format("Found previous partition offset VitessPartition [sourcePartition={server=test_server}]: {vgtid=%s}", VgtidTest.VGTID_JSON))).isTrue();
    }

    @Test
    public void shouldReadCurrentGenOffsets() {
        String taskKeyName = VitessConnector.getTaskKeyName(0, 1, 0);
        Configuration build = TestHelper.defaultConfig(true, true, 1, 0, 1, null, VitessConnectorConfig.SnapshotMode.NEVER).with("vitess.task.key", taskKeyName).with("vitess.total.tasks", 1).with("vitess.task.shards", "-80,80-").build();
        VitessConnectorTask vitessConnectorTask = new VitessConnectorTask();
        ContextHelper contextHelper = new ContextHelper();
        contextHelper.storeOffsets(null, Map.of(taskKeyName, VgtidTest.VGTID_JSON));
        vitessConnectorTask.initialize(contextHelper.getSourceTaskContext());
        vitessConnectorTask.start(build);
        Assertions.assertThat(vitessLogInterceptor.containsMessage("Using offsets from current gen")).isTrue();
    }

    @Test
    public void shouldReadPreviousGenOffsets() {
        String taskKeyName = VitessConnector.getTaskKeyName(0, 1, 0);
        ContextHelper contextHelper = new ContextHelper();
        contextHelper.storeOffsets(null, Map.of(taskKeyName, VgtidTest.VGTID_JSON));
        Configuration build = TestHelper.defaultConfig(true, true, 2, 1, 1, null, VitessConnectorConfig.SnapshotMode.NEVER, "-80,80-").with("vitess.task.key", VitessConnector.getTaskKeyName(0, 2, 1)).with("vitess.task.shards", "-80,80-").with("vitess.total.tasks", 2).build();
        VitessConnectorTask vitessConnectorTask = new VitessConnectorTask();
        vitessConnectorTask.initialize(contextHelper.getSourceTaskContext());
        vitessConnectorTask.start(build);
        Assertions.assertThat(vitessLogInterceptor.containsMessage("Using offsets from previous gen")).isTrue();
    }

    @Test
    public void shouldReadConfiguredOffsets() {
        Configuration build = TestHelper.defaultConfig(true, true, 1, 0, 1, null, VitessConnectorConfig.SnapshotMode.NEVER, "-80,80-").with("vitess.task.key", VitessConnector.getTaskKeyName(0, 1, 0)).with("vitess.task.shards", "-80,80-").with("vitess.total.tasks", 1).with(VitessConnectorConfig.VGTID, VgtidTest.VGTID_JSON).build();
        VitessConnectorTask vitessConnectorTask = new VitessConnectorTask();
        vitessConnectorTask.initialize(new ContextHelper().getSourceTaskContext());
        vitessConnectorTask.start(build);
        Assertions.assertThat(vitessLogInterceptor.containsMessage("Using offsets from config")).isTrue();
    }
}
