package io.mantisrx.connector.iceberg.sink.committer;

import io.mantisrx.connector.iceberg.sink.committer.config.CommitterConfig;
import io.mantisrx.connector.iceberg.sink.committer.config.CommitterProperties;
import io.mantisrx.connector.iceberg.sink.committer.metrics.CommitterMetrics;
import io.mantisrx.connector.iceberg.sink.committer.watermarks.WatermarkExtractor;
import io.mantisrx.connector.iceberg.sink.config.SinkProperties;
import io.mantisrx.connector.iceberg.sink.writer.MantisDataFile;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.codec.JacksonCodecs;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/connector/iceberg/sink/committer/IcebergCommitterStage.class */
public class IcebergCommitterStage implements ScalarComputation<MantisDataFile, Map<String, Object>> {
    private static final Logger logger = LoggerFactory.getLogger(IcebergCommitterStage.class);
    private Transformer transformer;

    /* loaded from: input_file:io/mantisrx/connector/iceberg/sink/committer/IcebergCommitterStage$Transformer.class */
    public static class Transformer implements Observable.Transformer<MantisDataFile, Map<String, Object>> {
        private final CommitterConfig config;
        private final CommitterMetrics metrics;
        private final IcebergCommitter committer;
        private final Scheduler scheduler;

        public Transformer(CommitterConfig committerConfig, CommitterMetrics committerMetrics, IcebergCommitter icebergCommitter, Scheduler scheduler) {
            this.config = committerConfig;
            this.metrics = committerMetrics;
            this.committer = icebergCommitter;
            this.scheduler = scheduler;
            IcebergCommitterStage.logger.info("Initialized IcebergCommitterStage with config: {}", committerConfig);
        }

        public Observable<Map<String, Object>> call(Observable<MantisDataFile> observable) {
            return observable.buffer(this.config.getCommitFrequencyMs(), TimeUnit.MILLISECONDS, this.scheduler).doOnNext(list -> {
                this.metrics.increment(CommitterMetrics.INVOCATION_COUNT);
            }).filter(list2 -> {
                return Boolean.valueOf(!list2.isEmpty());
            }).map(list3 -> {
                try {
                    long now = this.scheduler.now();
                    Map<String, Object> commit = this.committer.commit(list3);
                    this.metrics.record(CommitterMetrics.COMMIT_LATENCY_MSEC, this.scheduler.now() - now, TimeUnit.MILLISECONDS);
                    this.metrics.setGauge(CommitterMetrics.COMMIT_BATCH_SIZE, list3.size());
                    return commit;
                } catch (RuntimeException e) {
                    this.metrics.increment(CommitterMetrics.COMMIT_FAILURE_COUNT);
                    IcebergCommitterStage.logger.error("error committing to Iceberg", e);
                    return new HashMap();
                }
            }).filter(map -> {
                return Boolean.valueOf(!map.isEmpty());
            }).doOnNext(map2 -> {
                this.metrics.increment(CommitterMetrics.COMMIT_SUCCESS_COUNT);
                IcebergCommitterStage.logger.info("committed {}", map2);
            });
        }
    }

    public static ScalarToScalar.Config<MantisDataFile, Map<String, Object>> config() {
        return new ScalarToScalar.Config().description("").codec(JacksonCodecs.mapStringObject()).withParameters(parameters());
    }

    public static List<ParameterDefinition<?>> parameters() {
        return Arrays.asList(new StringParameter().name(SinkProperties.SINK_CATALOG).description(SinkProperties.SINK_CATALOG_DESCRIPTION).validator(Validators.notNullOrEmpty()).required().build(), new StringParameter().name(SinkProperties.SINK_DATABASE).description(SinkProperties.SINK_DATABASE_DESCRIPTION).validator(Validators.notNullOrEmpty()).required().build(), new StringParameter().name(SinkProperties.SINK_TABLE).description(SinkProperties.SINK_TABLE_DESCRIPTION).validator(Validators.notNullOrEmpty()).required().build(), new StringParameter().name(CommitterProperties.COMMIT_FREQUENCY_MS).description(CommitterProperties.COMMIT_FREQUENCY_DESCRIPTION).validator(Validators.alwaysPass()).defaultValue(CommitterProperties.COMMIT_FREQUENCY_MS_DEFAULT).build(), new StringParameter().name(CommitterProperties.WATERMARK_PROPERTY_KEY).description(CommitterProperties.WATERMARK_PROPERTY_DESCRIPTION).validator(Validators.alwaysPass()).build());
    }

    public static Transformer newTransformer(Context context) {
        CommitterConfig committerConfig = new CommitterConfig(context.getParameters());
        return new Transformer(committerConfig, new CommitterMetrics(), new IcebergCommitter(((Catalog) context.getServiceLocator().service(Catalog.class)).loadTable(TableIdentifier.of(new String[]{committerConfig.getCatalog(), committerConfig.getDatabase(), committerConfig.getTable()})), committerConfig, (WatermarkExtractor) context.getServiceLocator().service(WatermarkExtractor.class)), Schedulers.computation());
    }

    public void init(Context context) {
        this.transformer = newTransformer(context);
    }

    public Observable<Map<String, Object>> call(Context context, Observable<MantisDataFile> observable) {
        return observable.compose(this.transformer);
    }
}
