package io.trino.plugin.kafka.schema.confluent;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.trino.plugin.kafka.KafkaTopicDescription;
import io.trino.plugin.kafka.KafkaTopicFieldDescription;
import io.trino.plugin.kafka.KafkaTopicFieldGroup;
import io.trino.plugin.kafka.schema.TableDescriptionSupplier;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.TestingTypeManager;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import io.trino.testing.TestingConnectorSession;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.class */
public class TestConfluentSchemaRegistryTableDescriptionSupplier {
    private static final String DEFAULT_NAME = "tests";
    private static final SchemaRegistryClient SCHEMA_REGISTRY_CLIENT = new MockSchemaRegistryClient();

    @Test
    public void testTopicDescription() throws Exception {
        TableDescriptionSupplier createTableDescriptionSupplier = createTableDescriptionSupplier();
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, "simple_topic");
        String str = "simple_topic" + "-value";
        SCHEMA_REGISTRY_CLIENT.register(str, getAvroSchema(schemaTableName.getTableName(), ""));
        Assertions.assertThat(createTableDescriptionSupplier.listTables()).contains(new SchemaTableName[]{schemaTableName});
        Assertions.assertThat(getKafkaTopicDescription(createTableDescriptionSupplier, schemaTableName)).isEqualTo(new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), schemaTableName.getTableName(), Optional.empty(), Optional.of(getTopicFieldGroup(str, ImmutableList.of(getFieldDescription("col1", IntegerType.INTEGER), getFieldDescription("col2", VarcharType.createUnboundedVarcharType()))))));
    }

    @Test
    public void testCaseInsensitiveSubjectMapping() throws Exception {
        TableDescriptionSupplier createTableDescriptionSupplier = createTableDescriptionSupplier();
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, "camelCase_Topic");
        String str = "camelCase_Topic" + "-key";
        SCHEMA_REGISTRY_CLIENT.register(str, getAvroSchema(schemaTableName.getTableName(), ""));
        Assertions.assertThat(createTableDescriptionSupplier.listTables()).contains(new SchemaTableName[]{schemaTableName});
        Assertions.assertThat(getKafkaTopicDescription(createTableDescriptionSupplier, schemaTableName)).isEqualTo(new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), "camelCase_Topic", Optional.of(getTopicFieldGroup(str, ImmutableList.of(getFieldDescription("col1", IntegerType.INTEGER), getFieldDescription("col2", VarcharType.createUnboundedVarcharType())))), Optional.empty()));
    }

    @Test
    public void testAmbiguousSubject() throws Exception {
        TableDescriptionSupplier createTableDescriptionSupplier = createTableDescriptionSupplier();
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, "topic_one");
        SCHEMA_REGISTRY_CLIENT.register("topic_one" + "-key", getAvroSchema(schemaTableName.getTableName(), ""));
        SCHEMA_REGISTRY_CLIENT.register("topic_one".toUpperCase(Locale.ENGLISH) + "-key", getAvroSchema(schemaTableName.getTableName(), ""));
        Assertions.assertThat(createTableDescriptionSupplier.listTables()).contains(new SchemaTableName[]{schemaTableName});
        Assertions.assertThatThrownBy(() -> {
            createTableDescriptionSupplier.getTopicDescription(TestingConnectorSession.builder().setPropertyMetadata(new ConfluentSessionProperties(new ConfluentSchemaRegistryConfig()).getSessionProperties()).build(), schemaTableName);
        }).isInstanceOf(TrinoException.class).hasMessage("Unable to access 'topic_one' table. Subject is ambiguous, and may refer to one of the following: TOPIC_ONE, topic_one");
    }

    @Test
    public void testOverriddenSubject() throws Exception {
        TableDescriptionSupplier createTableDescriptionSupplier = createTableDescriptionSupplier();
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, "base_Topic");
        String str = "base_Topic" + "-value";
        SCHEMA_REGISTRY_CLIENT.register(str, getAvroSchema(schemaTableName.getTableName(), ""));
        SCHEMA_REGISTRY_CLIENT.register("overriddenSubject", getAvroSchema(schemaTableName.getTableName(), "overridden_"));
        Assertions.assertThat(createTableDescriptionSupplier.listTables()).contains(new SchemaTableName[]{schemaTableName});
        Assertions.assertThat(getKafkaTopicDescription(createTableDescriptionSupplier, schemaTableName)).isEqualTo(new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), "base_Topic", Optional.empty(), Optional.of(getTopicFieldGroup(str, ImmutableList.of(getFieldDescription("col1", IntegerType.INTEGER), getFieldDescription("col2", VarcharType.createUnboundedVarcharType()))))));
        Assertions.assertThat(getKafkaTopicDescription(createTableDescriptionSupplier, new SchemaTableName(DEFAULT_NAME, String.format("%s&value-subject=%s", "base_Topic", "overriddenSubject")))).isEqualTo(new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), "base_Topic", Optional.empty(), Optional.of(getTopicFieldGroup("overriddenSubject", ImmutableList.of(getFieldDescription("overridden_col1", IntegerType.INTEGER), getFieldDescription("overridden_col2", VarcharType.createUnboundedVarcharType()))))));
    }

    @Test
    public void testAmbiguousOverriddenSubject() throws Exception {
        TableDescriptionSupplier createTableDescriptionSupplier = createTableDescriptionSupplier();
        String str = "base_Topic";
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, "base_Topic");
        String str2 = "ambiguousOverriddenSubject";
        SCHEMA_REGISTRY_CLIENT.register("ambiguousOverriddenSubject", getAvroSchema(schemaTableName.getTableName(), "overridden_"));
        SCHEMA_REGISTRY_CLIENT.register("ambiguousOverriddenSubject".toUpperCase(Locale.ENGLISH), getAvroSchema(schemaTableName.getTableName(), "overridden_"));
        Assertions.assertThatThrownBy(() -> {
            createTableDescriptionSupplier.getTopicDescription(TestingConnectorSession.builder().setPropertyMetadata(new ConfluentSessionProperties(new ConfluentSchemaRegistryConfig()).getSessionProperties()).build(), new SchemaTableName(DEFAULT_NAME, String.format("%s&value-subject=%s", str, str2)));
        }).isInstanceOf(TrinoException.class).hasMessage("Subject 'ambiguousoverriddensubject' is ambiguous, and may refer to one of the following: AMBIGUOUSOVERRIDDENSUBJECT, ambiguousOverriddenSubject");
    }

    private KafkaTopicDescription getKafkaTopicDescription(TableDescriptionSupplier tableDescriptionSupplier, SchemaTableName schemaTableName) {
        return (KafkaTopicDescription) tableDescriptionSupplier.getTopicDescription(TestingConnectorSession.builder().setPropertyMetadata(new ConfluentSessionProperties(new ConfluentSchemaRegistryConfig()).getSessionProperties()).build(), schemaTableName).orElseThrow();
    }

    private TableDescriptionSupplier createTableDescriptionSupplier() {
        return new ConfluentSchemaRegistryTableDescriptionSupplier(SCHEMA_REGISTRY_CLIENT, ImmutableMap.of("AVRO", new AvroSchemaParser(new TestingTypeManager())), DEFAULT_NAME, new Duration(1.0d, TimeUnit.SECONDS));
    }

    private static AvroSchema getAvroSchema(String str, String str2) {
        return new AvroSchema((Schema) SchemaBuilder.record(str).fields().name(str2 + "col1").type().intType().noDefault().name(str2 + "col2").type().stringType().noDefault().endRecord());
    }

    private static KafkaTopicFieldGroup getTopicFieldGroup(String str, List<KafkaTopicFieldDescription> list) {
        return new KafkaTopicFieldGroup("avro", Optional.empty(), Optional.of(str), list);
    }

    private static KafkaTopicFieldDescription getFieldDescription(String str, Type type) {
        return new KafkaTopicFieldDescription(str, type, str, (String) null, (String) null, (String) null, false);
    }
}
