package io.openlineage.flink.visitor.wrapper;

import io.openlineage.client.OpenLineage;
import io.openlineage.flink.api.OpenLineageContext;
import io.openlineage.flink.shaded.org.apache.commons.lang3.reflect.FieldUtils;
import io.openlineage.flink.utils.AvroSchemaUtils;
import io.openlineage.flink.utils.ProtobufUtils;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/flink/visitor/wrapper/KafkaSourceWrapper.class */
public class KafkaSourceWrapper {
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceWrapper.class);
    private final OpenLineageContext context;
    private static final String VALUE_ONLY_DESERIALIZATION_SCHEMA_WRAPPER_CLASS = "org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper";
    private static final String DESERIALIZATION_SCHEMA_WRARPPER_CLASS = "org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper";
    private static final String DYNAMIC_DESERIALIZATION_SCHEMA_CLASS = "org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema";
    private final KafkaSource kafkaSource;
    private final KafkaSubscriber kafkaSubscriber;

    private KafkaSourceWrapper(KafkaSource kafkaSource, KafkaSubscriber kafkaSubscriber, OpenLineageContext openLineageContext) {
        this.kafkaSource = kafkaSource;
        this.kafkaSubscriber = kafkaSubscriber;
        this.context = openLineageContext;
    }

    public static KafkaSourceWrapper of(KafkaSource kafkaSource, OpenLineageContext openLineageContext) throws IllegalAccessException {
        return new KafkaSourceWrapper(kafkaSource, (KafkaSubscriber) FieldUtils.getField(KafkaSource.class, "subscriber", true).get(kafkaSource), openLineageContext);
    }

    public KafkaSubscriber getSubscriber() {
        return this.kafkaSubscriber;
    }

    public Properties getProps() throws IllegalAccessException {
        return (Properties) WrapperUtils.getFieldValue(KafkaSource.class, this.kafkaSource, "props").get();
    }

    public List<String> getTopics() throws IllegalAccessException {
        Optional fieldValue = WrapperUtils.getFieldValue(this.kafkaSubscriber.getClass(), this.kafkaSubscriber, "topics");
        if (fieldValue.isPresent()) {
            return (List) fieldValue.get();
        }
        Optional fieldValue2 = WrapperUtils.getFieldValue(this.kafkaSubscriber.getClass(), this.kafkaSubscriber, "topicPattern");
        if (!fieldValue2.isPresent()) {
            return Collections.emptyList();
        }
        KafkaTopicsDescriptor kafkaTopicsDescriptor = new KafkaTopicsDescriptor((List) null, (Pattern) fieldValue2.get());
        KafkaPartitionDiscoverer kafkaPartitionDiscoverer = new KafkaPartitionDiscoverer(kafkaTopicsDescriptor, 0, 0, getProps());
        WrapperUtils.invoke(KafkaPartitionDiscoverer.class, kafkaPartitionDiscoverer, "initializeConnections");
        return (List) ((List) WrapperUtils.invoke(KafkaPartitionDiscoverer.class, kafkaPartitionDiscoverer, "getAllTopics").get()).stream().filter(str -> {
            return kafkaTopicsDescriptor.isMatchingTopic(str);
        }).collect(Collectors.toList());
    }

    public KafkaRecordDeserializationSchema getDeserializationSchema() throws IllegalAccessException {
        return (KafkaRecordDeserializationSchema) WrapperUtils.getFieldValue(KafkaSource.class, this.kafkaSource, "deserializationSchema").get();
    }

    public Optional<OpenLineage.SchemaDatasetFacet> getSchemaFacet() {
        try {
            Class<?> cls = Class.forName(VALUE_ONLY_DESERIALIZATION_SCHEMA_WRAPPER_CLASS);
            Class<?> cls2 = Class.forName(DESERIALIZATION_SCHEMA_WRARPPER_CLASS);
            Class<?> cls3 = Class.forName(DYNAMIC_DESERIALIZATION_SCHEMA_CLASS);
            KafkaRecordDeserializationSchema deserializationSchema = getDeserializationSchema();
            log.debug("Deserialization schema is {} when extracting schema facet for Kafka source", deserializationSchema);
            if (deserializationSchema.getClass().isAssignableFrom(cls)) {
                return convert((DeserializationSchema) WrapperUtils.getFieldValue(cls, deserializationSchema, "deserializationSchema").get());
            }
            if (deserializationSchema.getClass().isAssignableFrom(cls2)) {
                Optional fieldValue = WrapperUtils.getFieldValue(cls2, deserializationSchema, "kafkaDeserializationSchema");
                if (fieldValue.isPresent()) {
                    return convert((DeserializationSchema) WrapperUtils.getFieldValue(cls3, fieldValue.get(), "valueDeserialization").get());
                }
            }
            return Optional.empty();
        } catch (ClassNotFoundException | IllegalAccessException e) {
            log.error("Cannot extract Avro schema: ", e);
            return Optional.empty();
        }
    }

    private Optional<OpenLineage.SchemaDatasetFacet> convert(DeserializationSchema deserializationSchema) {
        if (deserializationSchema instanceof AvroDeserializationSchema) {
            return convert(((AvroDeserializationSchema) deserializationSchema).getProducedType());
        }
        if (deserializationSchema instanceof AvroRowDataDeserializationSchema) {
            log.debug("Extracting Avro schema {}", deserializationSchema);
            return convert(((AvroRowDataDeserializationSchema) deserializationSchema).getProducedType());
        }
        if (!ProtobufUtils.isProtobufDeserializationSchema(deserializationSchema)) {
            return Optional.empty();
        }
        log.debug("Extracting Protobuf schema {}", deserializationSchema);
        return ProtobufUtils.convert(this.context.getOpenLineage(), deserializationSchema);
    }

    private Optional<OpenLineage.SchemaDatasetFacet> convert(TypeInformation<?> typeInformation) {
        return typeInformation.getTypeClass().equals(GenericRecord.class) ? WrapperUtils.getFieldValue(typeInformation.getClass(), typeInformation, "schema").map(schema -> {
            return AvroSchemaUtils.convert(this.context.getOpenLineage(), schema);
        }) : Optional.ofNullable(typeInformation.getTypeClass()).flatMap(cls -> {
            return WrapperUtils.invokeStatic(cls, "getClassSchema");
        }).map(schema2 -> {
            return AvroSchemaUtils.convert(this.context.getOpenLineage(), schema2);
        });
    }

    public KafkaSubscriber getKafkaSubscriber() {
        return this.kafkaSubscriber;
    }
}
