package io.mantisrx.connector.iceberg.sink.writer;

import io.mantisrx.connector.iceberg.sink.codecs.IcebergCodecs;
import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig;
import io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties;
import io.mantisrx.connector.iceberg.sink.writer.factory.DefaultIcebergWriterFactory;
import io.mantisrx.connector.iceberg.sink.writer.metrics.WriterMetrics;
import io.mantisrx.connector.iceberg.sink.writer.partitioner.Partitioner;
import io.mantisrx.connector.iceberg.sink.writer.partitioner.PartitionerFactory;
import io.mantisrx.connector.iceberg.sink.writer.pool.FixedIcebergWriterPool;
import io.mantisrx.connector.iceberg.sink.writer.pool.IcebergWriterPool;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.WorkerInfo;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.scheduler.MantisRxSingleThreadScheduler;
import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting;
import io.mantisrx.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.exceptions.Exceptions;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage.class */
public class IcebergWriterStage implements ScalarComputation<MantisRecord, MantisDataFile> {
    private static final Logger logger = LoggerFactory.getLogger(IcebergWriterStage.class);
    private Transformer transformer;

    /* loaded from: input_file:io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage$Transformer.class */
    public static class Transformer implements Observable.Transformer<MantisRecord, MantisDataFile> {
        private static final Schema TIMEOUT_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required(1, "ts_utc_msec", Types.LongType.get())});
        private static final MantisRecord TIMEOUT_RECORD = new MantisRecord(GenericRecord.create(TIMEOUT_SCHEMA), null);
        private final WriterConfig config;
        private final WriterMetrics metrics;
        private final Partitioner partitioner;
        private final IcebergWriterPool writerPool;
        private final Scheduler timerScheduler;
        private final Scheduler transformerScheduler;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage$Transformer$Trigger.class */
        public static class Trigger {
            private final int countThreshold;
            private final Set<StructLike> writers = new HashSet();
            private int counter;

            Trigger(int i) {
                this.countThreshold = i;
            }

            void increment() {
                this.counter++;
            }

            void reset() {
                this.counter = 0;
                this.writers.clear();
            }

            void trackAll(Set<StructLike> set) {
                this.writers.addAll(set);
            }

            Set<StructLike> getTrackedWriters() {
                return this.writers;
            }

            boolean isOverCountThreshold() {
                return this.counter >= this.countThreshold;
            }

            boolean shouldFlush() {
                return !this.writers.isEmpty();
            }

            public String toString() {
                return "Trigger{ countThreshold=" + this.countThreshold + ", writers=" + this.writers + ", counter=" + this.counter + '}';
            }
        }

        public Transformer(WriterConfig writerConfig, WriterMetrics writerMetrics, IcebergWriterPool icebergWriterPool, Partitioner partitioner, Scheduler scheduler, Scheduler scheduler2) {
            this.config = writerConfig;
            this.metrics = writerMetrics;
            this.writerPool = icebergWriterPool;
            this.partitioner = partitioner;
            this.timerScheduler = scheduler;
            this.transformerScheduler = scheduler2;
        }

        public Observable<MantisDataFile> call(Observable<MantisRecord> observable) {
            return observable.mergeWith(Observable.interval(this.config.getWriterFlushFrequencyMsec(), TimeUnit.MILLISECONDS, this.timerScheduler).map(l -> {
                return TIMEOUT_RECORD;
            })).observeOn(this.transformerScheduler).scan(new Trigger(this.config.getWriterRowGroupSize()), (trigger, mantisRecord) -> {
                if (mantisRecord.getRecord().struct().fields().equals(TIMEOUT_SCHEMA.columns())) {
                    trigger.trackAll(this.writerPool.getWriters());
                } else {
                    StructLike partition = this.partitioner.partition(mantisRecord.getRecord());
                    if (this.writerPool.isClosed(partition)) {
                        try {
                            IcebergWriterStage.logger.info("opening file for partition {}", partition);
                            this.writerPool.open(partition);
                            this.metrics.increment(WriterMetrics.OPEN_SUCCESS_COUNT);
                        } catch (IOException e) {
                            this.metrics.increment(WriterMetrics.OPEN_FAILURE_COUNT);
                            throw Exceptions.propagate(e);
                        }
                    }
                    try {
                        this.writerPool.write(partition, mantisRecord);
                        trigger.increment();
                        if (trigger.isOverCountThreshold()) {
                            trigger.trackAll(this.writerPool.getFlushableWriters());
                        }
                        this.metrics.increment(WriterMetrics.WRITE_SUCCESS_COUNT);
                    } catch (RuntimeException e2) {
                        this.metrics.increment(WriterMetrics.WRITE_FAILURE_COUNT);
                        IcebergWriterStage.logger.debug("error writing record {}", mantisRecord);
                    }
                }
                return trigger;
            }).filter((v0) -> {
                return v0.shouldFlush();
            }).map(trigger2 -> {
                ArrayList arrayList = new ArrayList();
                Iterator<StructLike> it = trigger2.getTrackedWriters().iterator();
                while (it.hasNext()) {
                    try {
                        arrayList.add(this.writerPool.close(it.next()));
                    } catch (IOException | RuntimeException e) {
                        this.metrics.increment(WriterMetrics.BATCH_FAILURE_COUNT);
                        IcebergWriterStage.logger.error("error writing DataFile", e);
                    }
                }
                trigger2.reset();
                return arrayList;
            }).filter(list -> {
                return Boolean.valueOf(!list.isEmpty());
            }).flatMapIterable(list2 -> {
                return list2;
            }).doOnNext(mantisDataFile -> {
                this.metrics.increment(WriterMetrics.BATCH_SUCCESS_COUNT);
                IcebergWriterStage.logger.info("writing DataFile: {}", mantisDataFile);
                this.metrics.setGauge(WriterMetrics.BATCH_SIZE, mantisDataFile.getDataFile().recordCount());
                this.metrics.setGauge(WriterMetrics.BATCH_SIZE_BYTES, mantisDataFile.getDataFile().fileSizeInBytes());
            }).doOnTerminate(() -> {
                try {
                    IcebergWriterStage.logger.info("closing writer on rx terminate signal");
                    this.writerPool.closeAll();
                } catch (IOException e) {
                    throw Exceptions.propagate(e);
                }
            }).share();
        }
    }

    public static ScalarToScalar.Config<MantisRecord, MantisDataFile> config() {
        return new ScalarToScalar.Config().description("").codec(IcebergCodecs.mantisDataFile()).serialInput().withParameters(parameters());
    }

    public static List<ParameterDefinition<?>> parameters() {
        return Arrays.asList(new IntParameter().name(WriterProperties.WRITER_ROW_GROUP_SIZE).description(WriterProperties.WRITER_ROW_GROUP_SIZE_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(100).build(), new StringParameter().name(WriterProperties.WRITER_FLUSH_FREQUENCY_BYTES).description(WriterProperties.WRITER_FLUSH_FREQUENCY_BYTES_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(WriterProperties.WRITER_FLUSH_FREQUENCY_BYTES_DEFAULT).build(), new StringParameter().name(WriterProperties.WRITER_FLUSH_FREQUENCY_MSEC).description(WriterProperties.WRITER_FLUSH_FREQUENCY_MSEC_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(WriterProperties.WRITER_FLUSH_FREQUENCY_MSEC_DEFAULT).build(), new StringParameter().name(WriterProperties.WRITER_FILE_FORMAT).description(WriterProperties.WRITER_FILE_FORMAT_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(WriterProperties.WRITER_FILE_FORMAT_DEFAULT).build(), new IntParameter().name(WriterProperties.WRITER_MAXIMUM_POOL_SIZE).description(WriterProperties.WRITER_MAXIMUM_POOL_SIZE_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(5).build());
    }

    public static Transformer newTransformer(Context context) {
        WriterConfig writerConfig = new WriterConfig(context.getParameters(), (Configuration) context.getServiceLocator().service(Configuration.class));
        Table loadTable = ((Catalog) context.getServiceLocator().service(Catalog.class)).loadTable(TableIdentifier.of(new String[]{writerConfig.getCatalog(), writerConfig.getDatabase(), writerConfig.getTable()}));
        return newTransformer(writerConfig, new WriterMetrics(), new FixedIcebergWriterPool(new DefaultIcebergWriterFactory(writerConfig, context.getWorkerInfo(), loadTable, (LocationProvider) context.getServiceLocator().service(LocationProvider.class)), writerConfig), ((PartitionerFactory) context.getServiceLocator().service(PartitionerFactory.class)).getPartitioner(loadTable), context.getWorkerInfo(), context.getClassLoader());
    }

    @VisibleForTesting
    static Transformer newTransformer(WriterConfig writerConfig, WriterMetrics writerMetrics, IcebergWriterPool icebergWriterPool, Partitioner partitioner, WorkerInfo workerInfo, @Nullable ClassLoader classLoader) {
        ThreadFactoryBuilder nameFormat = new ThreadFactoryBuilder().setNameFormat("IcebergWriter (" + (workerInfo.getWorkerIndex() + 1) + ")-%d");
        if (classLoader != null) {
            nameFormat.setThreadFactory(runnable -> {
                Thread thread = new Thread(runnable);
                thread.setContextClassLoader(classLoader);
                return thread;
            });
        }
        return new Transformer(writerConfig, writerMetrics, icebergWriterPool, partitioner, Schedulers.computation(), new MantisRxSingleThreadScheduler(nameFormat.build()));
    }

    public void init(Context context) {
        this.transformer = newTransformer(context);
    }

    public Observable<MantisDataFile> call(Context context, Observable<MantisRecord> observable) {
        return observable.compose(this.transformer);
    }
}
