package io.debezium.transforms;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.transforms.partitions.PartitionRouting;
import io.debezium.util.BoundedConcurrentHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/transforms/HeaderToValue.class */
public class HeaderToValue<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final String MOVE_OPERATION = "move";
    private static final String COPY_OPERATION = "copy";
    private static final int CACHE_SIZE = 64;
    public static final String NESTING_SEPARATOR = ".";
    public static final String ROOT_FIELD_NAME = "payload";
    private List<String> fields;
    private List<String> headers;
    private Operation operation;
    private final BoundedConcurrentHashMap<Schema, Schema> schemaUpdateCache = new BoundedConcurrentHashMap<>(64);
    private final BoundedConcurrentHashMap<Headers, Headers> headersUpdateCache = new BoundedConcurrentHashMap<>(64);
    private static final Logger LOGGER = LoggerFactory.getLogger(HeaderToValue.class);
    public static final String HEADERS_CONF = "headers";
    public static final Field HEADERS_FIELD = Field.create(HEADERS_CONF).withDisplayName("Header names list").withType(ConfigDef.Type.LIST).withImportance(ConfigDef.Importance.HIGH).withValidation(Field::notContainSpaceInAnyElement, Field::notContainEmptyElements).withDescription("Header names in the record whose values are to be copied or moved to record value.").required();
    public static final String FIELDS_CONF = "fields";
    public static final Field FIELDS_FIELD = Field.create(FIELDS_CONF).withDisplayName("Field names list").withType(ConfigDef.Type.LIST).withImportance(ConfigDef.Importance.HIGH).withValidation(Field::notContainSpaceInAnyElement, Field::notContainEmptyElements).withDescription("Field names, in the same order as the header names listed in the headers configuration property. Supports Struct nesting using dot notation.").required();
    public static final String OPERATION_CONF = "operation";
    public static final Field OPERATION_FIELD = Field.create(OPERATION_CONF).withDisplayName("Operation: mover or copy").withType(ConfigDef.Type.STRING).withEnum(Operation.class).withImportance(ConfigDef.Importance.HIGH).withDescription("Either <code>move</code> if the fields are to be moved to the value (removed from the headers), or <code>copy</code> if the fields are to be copied to the value (retained in the headers).").required();

    /* loaded from: input_file:io/debezium/transforms/HeaderToValue$Operation.class */
    enum Operation {
        MOVE(HeaderToValue.MOVE_OPERATION),
        COPY(HeaderToValue.COPY_OPERATION);

        private final String name;

        Operation(String str) {
            this.name = str;
        }

        static Operation fromName(String str) {
            boolean z = -1;
            switch (str.hashCode()) {
                case 3059573:
                    if (str.equals(HeaderToValue.COPY_OPERATION)) {
                        z = true;
                        break;
                    }
                    break;
                case 3357649:
                    if (str.equals(HeaderToValue.MOVE_OPERATION)) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return MOVE;
                case true:
                    return COPY;
                default:
                    throw new IllegalArgumentException();
            }
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.name;
        }
    }

    public ConfigDef config() {
        ConfigDef configDef = new ConfigDef();
        Field.group(configDef, null, HEADERS_FIELD, FIELDS_FIELD, OPERATION_FIELD);
        return configDef;
    }

    public void configure(Map<String, ?> map) {
        Configuration from = Configuration.from(map);
        new SmtManager(from).validate(from, Field.setOf(FIELDS_FIELD, HEADERS_FIELD, OPERATION_FIELD));
        this.fields = from.getList(FIELDS_FIELD);
        this.headers = from.getList(HEADERS_FIELD);
        validateConfiguration();
        this.operation = Operation.fromName(from.getString(OPERATION_FIELD));
    }

    private void validateConfiguration() {
        if (this.headers.size() != this.fields.size()) {
            throw new ConfigException(String.format("'%s' config must have the same number of elements as '%s' config.", FIELDS_FIELD, HEADERS_FIELD));
        }
    }

    public R apply(R r) {
        if (r.value() == null) {
            LOGGER.trace("Tombstone {} arrived and will be skipped", r.key());
            return r;
        }
        Struct requireStruct = Requirements.requireStruct(r.value(), "Header field insertion");
        LOGGER.trace("Processing record {}", requireStruct);
        Map<?, ?> map = (Map) StreamSupport.stream(r.headers().spliterator(), false).filter(header -> {
            return this.headers.contains(header.key());
        }).collect(Collectors.toMap((v0) -> {
            return v0.key();
        }, Function.identity()));
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Header to be processed: {}", headersToString(map));
        }
        if (map.isEmpty()) {
            return r;
        }
        Schema computeIfAbsent = this.schemaUpdateCache.computeIfAbsent(requireStruct.schema(), schema -> {
            return makeNewSchema(schema, map);
        });
        LOGGER.trace("Updated schema fields: {}", computeIfAbsent.fields());
        Struct makeUpdatedValue = makeUpdatedValue(requireStruct, map, computeIfAbsent);
        LOGGER.trace("Updated value: {}", makeUpdatedValue);
        Headers headers = r.headers();
        if (Operation.MOVE.equals(this.operation)) {
            headers = this.headersUpdateCache.computeIfAbsent(r.headers(), this::removeHeaders);
        }
        return (R) r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), computeIfAbsent, makeUpdatedValue, r.timestamp(), headers);
    }

    private Headers removeHeaders(Headers headers) {
        Headers duplicate = headers.duplicate();
        List<String> list = this.headers;
        Objects.requireNonNull(duplicate);
        list.forEach(duplicate::remove);
        return duplicate;
    }

    private Struct makeUpdatedValue(Struct struct, Map<String, Header> map, Schema schema) {
        return buildUpdatedValue("payload", struct, map, schema, (List) this.fields.stream().filter(str -> {
            return str.contains(NESTING_SEPARATOR);
        }).collect(Collectors.toList()), 0);
    }

    private Struct buildUpdatedValue(String str, Struct struct, Map<String, Header> map, Schema schema, List<String> list, int i) {
        Struct struct2 = new Struct(schema);
        for (org.apache.kafka.connect.data.Field field : struct.schema().fields()) {
            if (struct.get(field) != null) {
                if (isContainedIn(field.name(), list)) {
                    i++;
                    struct2.put(field.name(), buildUpdatedValue(field.name(), Requirements.requireStruct(struct.get(field), "Nested field"), map, schema.field(field.name()).schema(), list, i));
                } else {
                    struct2.put(field.name(), struct.get(field));
                }
            }
        }
        for (int i2 = 0; i2 < this.headers.size(); i2++) {
            Header header = map.get(this.headers.get(i2));
            if (header != null) {
                getFieldName(this.fields.get(i2), str, i).ifPresent(str2 -> {
                    struct2.put(str2, header.value());
                });
            }
        }
        return struct2;
    }

    private boolean isContainedIn(String str, List<String> list) {
        return list.stream().anyMatch(str2 -> {
            return str2.contains(str);
        });
    }

    private Schema makeNewSchema(Schema schema, Map<String, Header> map) {
        return buildNewSchema("payload", schema, map, (List) this.fields.stream().filter(str -> {
            return str.contains(NESTING_SEPARATOR);
        }).collect(Collectors.toList()), 0);
    }

    private Schema buildNewSchema(String str, Schema schema, Map<String, Header> map, List<String> list, int i) {
        if (schema.type().isPrimitive()) {
            return schema;
        }
        SchemaBuilder copySchemaBasics = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
        for (org.apache.kafka.connect.data.Field field : schema.fields()) {
            if (isContainedIn(field.name(), list)) {
                i++;
                copySchemaBasics.field(field.name(), buildNewSchema(field.name(), field.schema(), map, list, i));
            } else {
                copySchemaBasics.field(field.name(), field.schema());
            }
        }
        LOGGER.debug("Fields copied from the old schema {}", copySchemaBasics.fields());
        for (int i2 = 0; i2 < this.headers.size(); i2++) {
            Header header = map.get(this.headers.get(i2));
            Optional<String> fieldName = getFieldName(this.fields.get(i2), str, i);
            LOGGER.trace("CurrentHeader {} - currentFieldName {}", this.headers.get(i2), fieldName);
            if (fieldName.isPresent() && header != null) {
                copySchemaBasics = copySchemaBasics.field(fieldName.get(), header.schema());
            }
        }
        LOGGER.debug("Fields added from headers {}", copySchemaBasics.fields());
        return copySchemaBasics.build();
    }

    private Optional<String> getFieldName(String str, String str2, int i) {
        String[] split = str.split(PartitionRouting.NESTING_SEPARATOR);
        return isRootField(str2, split) ? Optional.of(split[0]) : isChildrenOf(str2, i, split) ? Optional.of(split[i]) : Optional.empty();
    }

    private static boolean isChildrenOf(String str, int i, String[] strArr) {
        return strArr[i == 0 ? 0 : i - 1].equals(str);
    }

    private static boolean isRootField(String str, String[] strArr) {
        return strArr.length == 1 && str.equals("payload");
    }

    private String headersToString(Map<?, ?> map) {
        return (String) map.keySet().stream().map(obj -> {
            return obj + "=" + map.get(obj);
        }).collect(Collectors.joining(", ", "{", "}"));
    }

    public void close() {
    }
}
