package io.castled.pipelines;

import com.google.common.collect.Lists;
import io.castled.ObjectRegistry;
import io.castled.commons.errors.errorclassifications.IncompatibleMappingError;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.streams.DataSinkMessageInputStream;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.commons.streams.MessageInputStream;
import io.castled.schema.IncompatibleValueException;
import io.castled.schema.SchemaMapper;
import io.castled.schema.models.Field;
import io.castled.schema.models.FieldSchema;
import io.castled.schema.models.Message;
import io.castled.schema.models.RecordSchema;
import io.castled.schema.models.Tuple;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/castled/pipelines/SchemaMappedMessageInputStream.class */
public class SchemaMappedMessageInputStream implements DataSinkMessageInputStream {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SchemaMappedMessageInputStream.class);
    private final RecordSchema targetSchema;
    private final MessageInputStream messageInputStream;
    private final Map<String, List<String>> targetSourceMapping;
    private final Map<String, List<String>> sourceTargetMapping;
    private final ErrorOutputStream errorOutputStream;
    private long failedRecords = 0;
    private final SchemaMapper schemaMapper = (SchemaMapper) ObjectRegistry.getInstance(SchemaMapper.class);

    public SchemaMappedMessageInputStream(RecordSchema recordSchema, MessageInputStream messageInputStream, Map<String, List<String>> map, Map<String, List<String>> map2, ErrorOutputStream errorOutputStream) {
        this.targetSchema = recordSchema;
        this.messageInputStream = messageInputStream;
        this.targetSourceMapping = map;
        this.errorOutputStream = errorOutputStream;
        this.sourceTargetMapping = map2;
    }

    @Override // io.castled.commons.streams.DataSinkMessageInputStream
    public DataSinkMessage readMessage() throws Exception {
        DataSinkMessage mapMessage;
        do {
            Message readMessage = this.messageInputStream.readMessage();
            if (readMessage == null) {
                return null;
            }
            if (this.sourceTargetMapping == null) {
                return new DataSinkMessage(readMessage);
            }
            mapMessage = mapMessage(readMessage);
        } while (mapMessage == null);
        return mapMessage;
    }

    private DataSinkMessage mapMessage(Message message) {
        Tuple.Builder builder = Tuple.builder();
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, List<String>> entry : this.targetSourceMapping.entrySet()) {
            FieldSchema fieldSchema = (FieldSchema) Optional.ofNullable(this.targetSchema).map(recordSchema -> {
                return recordSchema.getFieldSchema((String) entry.getKey());
            }).orElse(null);
            List<String> value = entry.getValue();
            if (fieldSchema == null) {
                String key = entry.getKey();
                for (String str : value) {
                    newArrayList.add(str);
                    Field field = message.getRecord().getField(str);
                    builder.put(new FieldSchema(key, field.getSchema(), field.getParams()), field.getValue());
                }
            } else if (CollectionUtils.isEmpty(value)) {
                continue;
            } else {
                for (String str2 : value) {
                    if (str2 != null) {
                        try {
                            builder.put(fieldSchema, this.schemaMapper.transformValue(message.getRecord().getValue(str2), fieldSchema.getSchema()));
                            newArrayList.add(str2);
                        } catch (IncompatibleValueException e) {
                            this.failedRecords++;
                            this.errorOutputStream.writeFailedRecord(new DataSinkMessage(message), new IncompatibleMappingError(str2, fieldSchema.getSchema()));
                            return null;
                        }
                    }
                }
            }
        }
        return new DataSinkMessage(new Message(message.getOffset(), builder.build()), (List) message.getRecord().getFields().stream().filter(field2 -> {
            return !newArrayList.contains(field2.getName());
        }).collect(Collectors.toList()));
    }

    private Message mapMessageFromSourceSchema(Message message) {
        Tuple.Builder builder = Tuple.builder();
        for (Field field : message.getRecord().getFields()) {
            List<String> list = this.sourceTargetMapping.get(field.getName());
            if (!CollectionUtils.isEmpty(list)) {
                list.forEach(str -> {
                    builder.put(new FieldSchema(str, field.getSchema(), field.getParams()), field.getValue());
                });
            }
        }
        return new Message(message.getOffset(), builder.build());
    }

    public void close() throws Exception {
        this.messageInputStream.close();
    }

    public long getFailedRecords() {
        return this.failedRecords;
    }
}
