package io.mantisrx.connector.iceberg.sink.writer;

import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig;
import io.mantisrx.runtime.WorkerInfo;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/connector/iceberg/sink/writer/DefaultIcebergWriter.class */
public class DefaultIcebergWriter implements IcebergWriter {
    private static final Logger logger = LoggerFactory.getLogger(DefaultIcebergWriter.class);
    private final Map<String, String> tableProperties = new HashMap();
    private final WriterConfig config;
    private final WorkerInfo workerInfo;
    private final Table table;
    private final PartitionSpec spec;
    private final FileFormat format;
    private final LocationProvider locationProvider;
    private FileAppender<Record> appender;
    private OutputFile file;
    private StructLike partitionKey;

    @Nullable
    private Long lowWatermark;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.mantisrx.connector.iceberg.sink.writer.DefaultIcebergWriter$1, reason: invalid class name */
    /* loaded from: input_file:io/mantisrx/connector/iceberg/sink/writer/DefaultIcebergWriter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iceberg$FileFormat = new int[FileFormat.values().length];

        static {
            try {
                $SwitchMap$org$apache$iceberg$FileFormat[FileFormat.PARQUET.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iceberg$FileFormat[FileFormat.AVRO.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public DefaultIcebergWriter(WriterConfig writerConfig, WorkerInfo workerInfo, Table table, LocationProvider locationProvider) {
        this.config = writerConfig;
        this.workerInfo = workerInfo;
        this.table = table;
        this.spec = table.spec();
        this.format = FileFormat.valueOf(writerConfig.getWriterFileFormat());
        this.locationProvider = locationProvider;
        this.tableProperties.putAll(table.properties());
        if (this.tableProperties.containsKey("write.parquet.compression-codec")) {
            return;
        }
        this.tableProperties.put("write.parquet.compression-codec", CompressionCodecName.ZSTD.name());
    }

    @Override // io.mantisrx.connector.iceberg.sink.writer.IcebergWriter
    public void open() throws IOException {
        open(null);
    }

    @Override // io.mantisrx.connector.iceberg.sink.writer.IcebergWriter
    public void open(StructLike structLike) throws IOException {
        this.partitionKey = structLike;
        Path path = new Path(this.table.location(), generateFilename());
        logger.info("opening new {} file appender {}", this.format, this.locationProvider.newDataLocation(path.toString()));
        this.file = this.table.io().newOutputFile(path.toString());
        switch (AnonymousClass1.$SwitchMap$org$apache$iceberg$FileFormat[this.format.ordinal()]) {
            case 1:
                this.appender = Parquet.write(this.file).schema(this.table.schema()).createWriterFunc(GenericParquetWriter::buildWriter).setAll(this.tableProperties).overwrite().build();
                this.lowWatermark = null;
                return;
            case 2:
            default:
                throw new UnsupportedOperationException("Cannot write using an unsupported file format " + this.format);
        }
    }

    @Override // io.mantisrx.connector.iceberg.sink.writer.IcebergWriter
    public void write(MantisRecord mantisRecord) {
        this.appender.add(mantisRecord.getRecord());
        this.lowWatermark = minNullSafe(this.lowWatermark, mantisRecord.getTimestamp());
    }

    @Override // io.mantisrx.connector.iceberg.sink.writer.IcebergWriter
    public MantisDataFile close() throws IOException, UncheckedIOException {
        if (isClosed()) {
            return null;
        }
        try {
            this.appender.close();
            return new MantisDataFile(DataFiles.builder(this.spec).withPath(this.file.location()).withInputFile(this.file.toInputFile()).withFileSizeInBytes(this.appender.length()).withPartition(this.spec.fields().size() == 0 ? null : this.partitionKey).withMetrics(this.appender.metrics()).withSplitOffsets(this.appender.splitOffsets()).build(), this.lowWatermark);
        } finally {
            this.appender = null;
            this.file = null;
        }
    }

    @Override // io.mantisrx.connector.iceberg.sink.writer.IcebergWriter
    public boolean isClosed() {
        return this.appender == null;
    }

    @Override // io.mantisrx.connector.iceberg.sink.writer.IcebergWriter
    public long length() throws UncheckedIOException {
        if (this.appender == null) {
            return 0L;
        }
        return this.appender.length();
    }

    @Override // io.mantisrx.connector.iceberg.sink.writer.IcebergWriter
    public StructLike getPartitionKey() {
        return this.partitionKey;
    }

    private String generateFilename() {
        return generateDataPath(generatePartitionPath(this.format.addExtension(String.format("%s_%s_%s_%s_%s", this.workerInfo.getJobId(), Integer.valueOf(this.workerInfo.getStageNumber()), Integer.valueOf(this.workerInfo.getWorkerIndex()), Integer.valueOf(this.workerInfo.getWorkerNumber()), UUID.randomUUID()))));
    }

    private String generateDataPath(String str) {
        return String.format("data/%s", str);
    }

    private String generatePartitionPath(String str) {
        return this.spec.isUnpartitioned() ? str : String.format("/%s/%s", this.spec.partitionToPath(this.partitionKey), str);
    }

    public static Long minNullSafe(@Nullable Long l, @Nullable Long l2) {
        return compareNullSafe(l, l2, (v0, v1) -> {
            return Math.min(v0, v1);
        });
    }

    public static Long maxNullSafe(@Nullable Long l, @Nullable Long l2) {
        return compareNullSafe(l, l2, (v0, v1) -> {
            return Math.max(v0, v1);
        });
    }

    private static Long compareNullSafe(@Nullable Long l, @Nullable Long l2, BiFunction<Long, Long, Long> biFunction) {
        return (l == null || l2 == null) ? l != null ? l : l2 : biFunction.apply(l, l2);
    }
}
