package io.trino.plugin.kafka.schema.confluent;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.UnmodifiableIterator;
import io.airlift.units.Duration;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.trino.plugin.kafka.KafkaConfig;
import io.trino.plugin.kafka.KafkaTopicDescription;
import io.trino.plugin.kafka.KafkaTopicFieldGroup;
import io.trino.plugin.kafka.schema.TableDescriptionSupplier;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import javax.inject.Inject;
import javax.inject.Provider;

/* loaded from: input_file:io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.class */
public class ConfluentSchemaRegistryTableDescriptionSupplier implements TableDescriptionSupplier {
    public static final String NAME = "confluent";
    private static final String KEY_VALUE_PAIR_DELIMITER = "&";
    private static final String KEY_VALUE_DELIMITER = "=";
    private static final String KEY_SUBJECT = "key-subject";
    private static final String VALUE_SUBJECT = "value-subject";
    private static final String KEY_SUFFIX = "-key";
    private static final String VALUE_SUFFIX = "-value";
    private final SchemaRegistryClient schemaRegistryClient;
    private final Map<String, SchemaParser> schemaParsers;
    private final String defaultSchema;
    private final Supplier<Map<String, TopicAndSubjects>> topicAndSubjectsSupplier;
    private final Supplier<Map<String, String>> subjectsSupplier;

    /* loaded from: input_file:io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier$Factory.class */
    public static class Factory implements Provider<TableDescriptionSupplier> {
        private final SchemaRegistryClient schemaRegistryClient;
        private final Map<String, SchemaParser> schemaParsers;
        private final String defaultSchema;
        private final Duration subjectsCacheRefreshInterval;

        @Inject
        public Factory(SchemaRegistryClient schemaRegistryClient, Map<String, SchemaParser> map, KafkaConfig kafkaConfig, ConfluentSchemaRegistryConfig confluentSchemaRegistryConfig) {
            this.schemaRegistryClient = (SchemaRegistryClient) Objects.requireNonNull(schemaRegistryClient, "schemaRegistryClient is null");
            this.schemaParsers = ImmutableMap.copyOf((Map) Objects.requireNonNull(map, "schemaParsers is null"));
            Objects.requireNonNull(kafkaConfig, "kafkaConfig is null");
            this.defaultSchema = kafkaConfig.getDefaultSchema();
            Objects.requireNonNull(confluentSchemaRegistryConfig, "confluentConfig is null");
            this.subjectsCacheRefreshInterval = confluentSchemaRegistryConfig.getConfluentSubjectsCacheRefreshInterval();
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public TableDescriptionSupplier m18get() {
            return new ConfluentSchemaRegistryTableDescriptionSupplier(this.schemaRegistryClient, this.schemaParsers, this.defaultSchema, this.subjectsCacheRefreshInterval);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier$TopicAndSubjects.class */
    public static class TopicAndSubjects {
        private final Optional<String> keySubject;
        private final Optional<String> valueSubject;
        private final String topic;

        public TopicAndSubjects(String str, Optional<String> optional, Optional<String> optional2) {
            this.topic = (String) Objects.requireNonNull(str, "topic is null");
            this.keySubject = (Optional) Objects.requireNonNull(optional, "keySubject is null");
            this.valueSubject = (Optional) Objects.requireNonNull(optional2, "valueSubject is null");
        }

        public String getTableName() {
            return this.topic.toLowerCase(Locale.ENGLISH);
        }

        public String getTopic() {
            return this.topic;
        }

        public Optional<String> getKeySubject() {
            return this.keySubject;
        }

        public Optional<String> getValueSubject() {
            return this.valueSubject;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof TopicAndSubjects)) {
                return false;
            }
            TopicAndSubjects topicAndSubjects = (TopicAndSubjects) obj;
            return this.topic.equals(topicAndSubjects.topic) && this.keySubject.equals(topicAndSubjects.keySubject) && this.valueSubject.equals(topicAndSubjects.valueSubject);
        }

        public int hashCode() {
            return Objects.hash(this.topic, this.keySubject, this.valueSubject);
        }
    }

    public ConfluentSchemaRegistryTableDescriptionSupplier(SchemaRegistryClient schemaRegistryClient, Map<String, SchemaParser> map, String str, Duration duration) {
        this.schemaRegistryClient = (SchemaRegistryClient) Objects.requireNonNull(schemaRegistryClient, "schemaRegistryClient is null");
        this.schemaParsers = ImmutableMap.copyOf((Map) Objects.requireNonNull(map, "schemaParsers is null"));
        this.defaultSchema = (String) Objects.requireNonNull(str, "defaultSchema is null");
        this.topicAndSubjectsSupplier = Suppliers.memoizeWithExpiration(this::getTopicAndSubjects, duration.toMillis(), TimeUnit.MILLISECONDS);
        this.subjectsSupplier = Suppliers.memoizeWithExpiration(this::getAllSubjects, duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    private String resolveSubject(String str) {
        String str2 = this.subjectsSupplier.get().get(str);
        Preconditions.checkState(str2 != null, "Subject '%s' not found", str);
        return str2;
    }

    private Map<String, String> getAllSubjects() {
        try {
            return (Map) this.schemaRegistryClient.getAllSubjects().stream().collect(ImmutableMap.toImmutableMap(str -> {
                return str.toLowerCase(Locale.ENGLISH);
            }, UnaryOperator.identity()));
        } catch (IOException | RestClientException e) {
            throw new RuntimeException("Failed to retrieve subjects from schema registry", e);
        }
    }

    private Map<String, TopicAndSubjects> getTopicAndSubjects() {
        ImmutableSetMultimap.Builder builder = ImmutableSetMultimap.builder();
        for (String str : this.subjectsSupplier.get().values()) {
            if (isValidSubject(str)) {
                builder.put(extractTopicFromSubject(str), str);
            }
        }
        ImmutableMap.Builder builder2 = ImmutableMap.builder();
        UnmodifiableIterator it = builder.build().asMap().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            String str2 = (String) entry.getKey();
            TopicAndSubjects topicAndSubjects = new TopicAndSubjects(str2, getKeySubjectFromTopic(str2, (Collection) entry.getValue()), getValueSubjectFromTopic(str2, (Collection) entry.getValue()));
            builder2.put(topicAndSubjects.getTableName(), topicAndSubjects);
        }
        return builder2.build();
    }

    @Override // io.trino.plugin.kafka.schema.TableDescriptionSupplier
    public Optional<KafkaTopicDescription> getTopicDescription(ConnectorSession connectorSession, SchemaTableName schemaTableName) {
        Objects.requireNonNull(schemaTableName, "schemaTableName is null");
        TopicAndSubjects parseTopicAndSubjects = parseTopicAndSubjects(schemaTableName);
        String tableName = parseTopicAndSubjects.getTableName();
        Optional ofNullable = Optional.ofNullable(this.topicAndSubjectsSupplier.get().get(tableName));
        String str = (String) ofNullable.map((v0) -> {
            return v0.getTopic();
        }).orElse(parseTopicAndSubjects.getTopic());
        Optional or = parseTopicAndSubjects.getKeySubject().map(this::resolveSubject).or(() -> {
            return ofNullable.flatMap((v0) -> {
                return v0.getKeySubject();
            });
        });
        Optional or2 = parseTopicAndSubjects.getValueSubject().map(this::resolveSubject).or(() -> {
            return ofNullable.flatMap((v0) -> {
                return v0.getValueSubject();
            });
        });
        if (or.isEmpty() && or2.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(new KafkaTopicDescription(tableName, Optional.of(schemaTableName.getSchemaName()), str, or.map(str2 -> {
            return getFieldGroup(connectorSession, str2);
        }), or2.map(str3 -> {
            return getFieldGroup(connectorSession, str3);
        })));
    }

    private KafkaTopicFieldGroup getFieldGroup(ConnectorSession connectorSession, String str) {
        SchemaMetadata latestSchemaMetadata = getLatestSchemaMetadata(str);
        SchemaParser schemaParser = this.schemaParsers.get(latestSchemaMetadata.getSchemaType());
        if (schemaParser == null) {
            throw new TrinoException(StandardErrorCode.NOT_SUPPORTED, "Not supported schema: " + latestSchemaMetadata.getSchemaType());
        }
        return schemaParser.parse(connectorSession, str, latestSchemaMetadata.getSchema());
    }

    private SchemaMetadata getLatestSchemaMetadata(String str) {
        try {
            return this.schemaRegistryClient.getLatestSchemaMetadata(str);
        } catch (IOException | RestClientException e) {
            throw new RuntimeException(String.format("Unable to get field group for '%s' subject", str), e);
        }
    }

    private static TopicAndSubjects parseTopicAndSubjects(SchemaTableName schemaTableName) {
        List splitToList = Splitter.on(KEY_VALUE_PAIR_DELIMITER).trimResults().splitToList(schemaTableName.getTableName());
        Preconditions.checkState(!splitToList.isEmpty() && splitToList.size() <= 3, "Unexpected format for encodedTableName. Expected format is <tableName>[&key-subject=<key subject>][&value-subject=<value subject>]");
        String str = (String) splitToList.get(0);
        Optional empty = Optional.empty();
        Optional empty2 = Optional.empty();
        for (int i = 1; i < splitToList.size(); i++) {
            List splitToList2 = Splitter.on(KEY_VALUE_DELIMITER).trimResults().splitToList((CharSequence) splitToList.get(i));
            Preconditions.checkState(splitToList2.size() == 2 && (((String) splitToList2.get(0)).equals(KEY_SUBJECT) || ((String) splitToList2.get(0)).equals(VALUE_SUBJECT)), "Unexpected parameter '%s', should be %s=<key subject>' or %s=<value subject>", splitToList.get(i), KEY_SUBJECT, VALUE_SUBJECT);
            if (((String) splitToList2.get(0)).equals(KEY_SUBJECT)) {
                Preconditions.checkState(empty.isEmpty(), "Key subject already defined");
                empty = Optional.of((String) splitToList2.get(1));
            } else {
                Preconditions.checkState(empty2.isEmpty(), "Value subject already defined");
                empty2 = Optional.of((String) splitToList2.get(1));
            }
        }
        return new TopicAndSubjects(str, empty, empty2);
    }

    @Override // io.trino.plugin.kafka.schema.TableDescriptionSupplier
    public Set<SchemaTableName> listTables() {
        return (Set) this.topicAndSubjectsSupplier.get().keySet().stream().map(str -> {
            return new SchemaTableName(this.defaultSchema, str);
        }).collect(ImmutableSet.toImmutableSet());
    }

    private static boolean isValidSubject(String str) {
        Objects.requireNonNull(str, "subject is null");
        return str.endsWith(VALUE_SUFFIX) || str.endsWith(KEY_SUFFIX);
    }

    private static String extractTopicFromSubject(String str) {
        Objects.requireNonNull(str, "subject is null");
        if (str.endsWith(VALUE_SUFFIX)) {
            return str.substring(0, str.length() - VALUE_SUFFIX.length());
        }
        Preconditions.checkState(str.endsWith(KEY_SUFFIX), "Unexpected subject name %s", str);
        return str.substring(0, str.length() - KEY_SUFFIX.length());
    }

    private static Optional<String> getKeySubjectFromTopic(String str, Collection<String> collection) {
        String str2 = str + "-key";
        return collection.contains(str2) ? Optional.of(str2) : Optional.empty();
    }

    private static Optional<String> getValueSubjectFromTopic(String str, Collection<String> collection) {
        String str2 = str + "-value";
        return collection.contains(str2) ? Optional.of(str2) : Optional.empty();
    }
}
