package io.openlineage.flink.visitor.wrapper;

import io.openlineage.client.OpenLineage;
import io.openlineage.flink.api.OpenLineageContext;
import io.openlineage.flink.shaded.org.apache.commons.lang3.reflect.TypeUtils;
import io.openlineage.flink.utils.AvroSchemaUtils;
import io.openlineage.flink.utils.ProtobufUtils;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.formats.avro.AvroSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/flink/visitor/wrapper/KafkaSinkWrapper.class */
public class KafkaSinkWrapper {
    private static final Logger log = LoggerFactory.getLogger(KafkaSinkWrapper.class);
    private final KafkaSink kafkaSink;
    private final KafkaRecordSerializationSchema serializationSchema;
    private final OpenLineageContext context;

    KafkaSinkWrapper(KafkaSink kafkaSink, OpenLineageContext openLineageContext) {
        this.kafkaSink = kafkaSink;
        this.context = openLineageContext;
        this.serializationSchema = (KafkaRecordSerializationSchema) WrapperUtils.getFieldValue(KafkaSink.class, kafkaSink, "recordSerializer").get();
        log.debug("SerializationSchema is null: {}", Boolean.valueOf(this.serializationSchema == null));
    }

    public static KafkaSinkWrapper of(KafkaSink kafkaSink, OpenLineageContext openLineageContext) {
        return new KafkaSinkWrapper(kafkaSink, openLineageContext);
    }

    public Properties getKafkaProducerConfig() {
        return (Properties) WrapperUtils.getFieldValue(KafkaSink.class, this.kafkaSink, "kafkaProducerConfig").get();
    }

    public List<String> getTopicsOfMultiTopicSink() {
        return (List) Optional.of(this.serializationSchema).filter(kafkaRecordSerializationSchema -> {
            return kafkaRecordSerializationSchema instanceof KafkaTopicsDescriptor;
        }).map(kafkaRecordSerializationSchema2 -> {
            return (KafkaTopicsDescriptor) kafkaRecordSerializationSchema2;
        }).filter(kafkaTopicsDescriptor -> {
            return kafkaTopicsDescriptor.isFixedTopics();
        }).map(kafkaTopicsDescriptor2 -> {
            return kafkaTopicsDescriptor2.getFixedTopics();
        }).orElse(Collections.emptyList());
    }

    public Optional<SerializationSchema> getSchemaOfMultiTopicSink() {
        return Arrays.stream(this.serializationSchema.getClass().getGenericInterfaces()).filter(type -> {
            return TypeUtils.isAssignable(type, (Type) KafkaRecordSerializationSchema.class);
        }).findFirst().filter(type2 -> {
            return type2 instanceof ParameterizedType;
        }).map(type3 -> {
            return (ParameterizedType) type3;
        }).map(parameterizedType -> {
            return parameterizedType.getActualTypeArguments();
        }).filter(typeArr -> {
            return typeArr != null || typeArr.length > 0;
        }).map(typeArr2 -> {
            return typeArr2[0];
        }).filter(type4 -> {
            return type4 instanceof Class;
        }).map(type5 -> {
            return AvroSerializationSchema.forSpecific((Class) type5);
        });
    }

    public String getKafkaTopic() throws IllegalAccessException {
        log.debug("Extracting Kafka topic from: {}", this.serializationSchema);
        Optional fieldValue = WrapperUtils.getFieldValue(this.serializationSchema.getClass(), this.serializationSchema, "topicSelector");
        if (fieldValue.isPresent()) {
            return (String) ((Function) WrapperUtils.getFieldValue(((Function) fieldValue.get()).getClass(), fieldValue.get(), "topicSelector").get()).apply(null);
        }
        Optional fieldValue2 = WrapperUtils.getFieldValue(this.serializationSchema.getClass(), this.serializationSchema, "topic");
        return fieldValue2.isPresent() ? (String) fieldValue2.get() : "";
    }

    public Optional<OpenLineage.SchemaDatasetFacet> getSchemaFacet() {
        Optional fieldValue = WrapperUtils.getFieldValue(this.serializationSchema.getClass(), this.serializationSchema, "valueSerializationSchema");
        if (!fieldValue.isPresent()) {
            log.debug("ValueSerializationSchema is not present when extracting schema facet");
            return AvroUtils.getAvroSchema(WrapperUtils.getFieldValue(this.serializationSchema.getClass(), this.serializationSchema, "valueSerialization")).map(schema -> {
                return AvroSchemaUtils.convert(this.context.getOpenLineage(), schema);
            });
        }
        log.debug("ValueSerializationSchema is present when extracting schema facet: {}", fieldValue.get());
        if (fieldValue.get() instanceof AvroSerializationSchema) {
            log.debug("Extracting AvroSchema from {}", fieldValue.get());
            return AvroUtils.getAvroSchema(fieldValue).map(schema2 -> {
                return AvroSchemaUtils.convert(this.context.getOpenLineage(), schema2);
            });
        }
        if (ProtobufUtils.isProtobufSerializationSchema((SerializationSchema) fieldValue.get())) {
            log.debug("Extracting Protobuf schema from {}", fieldValue.get());
            return ProtobufUtils.convert(this.context.getOpenLineage(), (SerializationSchema) fieldValue.get());
        }
        log.warn("Unsupported valueSerializationSchema {}", this.serializationSchema);
        return Optional.empty();
    }
}
