package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.kafka.encoder.DispatchingRowEncoderFactory;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.Optional;
import javax.inject.Inject;

/* loaded from: input_file:io/trino/plugin/kafka/KafkaPageSinkProvider.class */
public class KafkaPageSinkProvider implements ConnectorPageSinkProvider {
    private final DispatchingRowEncoderFactory encoderFactory;
    private final KafkaProducerFactory producerFactory;

    @Inject
    public KafkaPageSinkProvider(DispatchingRowEncoderFactory dispatchingRowEncoderFactory, KafkaProducerFactory kafkaProducerFactory) {
        this.encoderFactory = (DispatchingRowEncoderFactory) Objects.requireNonNull(dispatchingRowEncoderFactory, "encoderFactory is null");
        this.producerFactory = (KafkaProducerFactory) Objects.requireNonNull(kafkaProducerFactory, "producerFactory is null");
    }

    public ConnectorPageSink createPageSink(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorOutputTableHandle connectorOutputTableHandle) {
        throw new UnsupportedOperationException("Table creation is not supported by the kafka connector");
    }

    public ConnectorPageSink createPageSink(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorInsertTableHandle connectorInsertTableHandle) {
        Objects.requireNonNull(connectorInsertTableHandle, "tableHandle is null");
        KafkaTableHandle kafkaTableHandle = (KafkaTableHandle) connectorInsertTableHandle;
        ImmutableList.Builder builder = ImmutableList.builder();
        ImmutableList.Builder builder2 = ImmutableList.builder();
        kafkaTableHandle.getColumns().forEach(kafkaColumnHandle -> {
            if (kafkaColumnHandle.isInternal()) {
                throw new IllegalArgumentException(String.format("unexpected internal column '%s'", kafkaColumnHandle.getName()));
            }
            if (kafkaColumnHandle.isKeyCodec()) {
                builder.add(kafkaColumnHandle);
            } else {
                builder2.add(kafkaColumnHandle);
            }
        });
        return new KafkaPageSink(kafkaTableHandle.getTopicName(), kafkaTableHandle.getColumns(), this.encoderFactory.create(connectorSession, kafkaTableHandle.getKeyDataFormat(), getDataSchema(kafkaTableHandle.getKeyDataSchemaLocation()), builder.build()), this.encoderFactory.create(connectorSession, kafkaTableHandle.getMessageDataFormat(), getDataSchema(kafkaTableHandle.getMessageDataSchemaLocation()), builder2.build()), this.producerFactory);
    }

    private static Optional<String> getDataSchema(Optional<String> optional) {
        return optional.map(str -> {
            try {
                return Files.readString(Paths.get(str, new String[0]));
            } catch (IOException e) {
                throw new TrinoException(KafkaErrorCode.KAFKA_SCHEMA_ERROR, String.format("Unable to read data schema at '%s'", optional.get()), e);
            }
        });
    }
}
