package io.trino.operator.join;

import com.google.common.collect.Iterators;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.trino.operator.WorkProcessor;
import io.trino.operator.join.DefaultPageJoiner;
import io.trino.operator.join.PageJoiner;
import io.trino.operator.join.PartitionedConsumption;
import io.trino.spi.Page;
import io.trino.spiller.PartitioningSpiller;
import io.trino.spiller.PartitioningSpillerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/* loaded from: input_file:io/trino/operator/join/SpillingJoinProcessor.class */
public class SpillingJoinProcessor implements WorkProcessor.Process<WorkProcessor<Page>> {
    private final Runnable afterClose;
    private final OptionalInt lookupJoinsCount;
    private final boolean waitForBuild;
    private final LookupSourceFactory lookupSourceFactory;
    private final ListenableFuture<LookupSourceProvider> lookupSourceProvider;
    private final PageJoiner.PageJoinerFactory pageJoinerFactory;
    private final PageJoiner sourcePagesJoiner;
    private final WorkProcessor<Page> joinedSourcePages;
    private boolean closed;

    @Nullable
    private ListenableFuture<PartitionedConsumption<Supplier<LookupSource>>> partitionedConsumption;

    @Nullable
    private Iterator<PartitionedConsumption.Partition<Supplier<LookupSource>>> lookupPartitions;

    @Nullable
    private PartitionedConsumption.Partition<Supplier<LookupSource>> previousPartition;

    @Nullable
    private ListenableFuture<Supplier<LookupSource>> previousPartitionLookupSource;

    public SpillingJoinProcessor(Runnable runnable, OptionalInt optionalInt, boolean z, LookupSourceFactory lookupSourceFactory, ListenableFuture<LookupSourceProvider> listenableFuture, PartitioningSpillerFactory partitioningSpillerFactory, PageJoiner.PageJoinerFactory pageJoinerFactory, WorkProcessor<Page> workProcessor) {
        this.afterClose = (Runnable) Objects.requireNonNull(runnable, "afterClose is null");
        this.lookupJoinsCount = (OptionalInt) Objects.requireNonNull(optionalInt, "lookupJoinsCount is null");
        this.waitForBuild = z;
        this.lookupSourceFactory = (LookupSourceFactory) Objects.requireNonNull(lookupSourceFactory, "lookupSourceFactory is null");
        this.lookupSourceProvider = (ListenableFuture) Objects.requireNonNull(listenableFuture, "lookupSourceProvider is null");
        this.pageJoinerFactory = (PageJoiner.PageJoinerFactory) Objects.requireNonNull(pageJoinerFactory, "pageJoinerFactory is null");
        this.sourcePagesJoiner = pageJoinerFactory.getPageJoiner(listenableFuture, Optional.of(partitioningSpillerFactory), Collections.emptyIterator());
        this.joinedSourcePages = workProcessor.transform(this.sourcePagesJoiner);
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            Closer create = Closer.create();
            try {
                Runnable runnable = this.afterClose;
                Objects.requireNonNull(runnable);
                create.register(runnable::run);
                create.register(this.sourcePagesJoiner);
                Optional<PartitioningSpiller> spiller = this.sourcePagesJoiner.getSpiller();
                Objects.requireNonNull(create);
                spiller.ifPresent((v1) -> {
                    r1.register(v1);
                });
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // io.trino.operator.WorkProcessor.Process
    public WorkProcessor.ProcessState<WorkProcessor<Page>> process() {
        if (this.waitForBuild && !this.lookupSourceProvider.isDone()) {
            return WorkProcessor.ProcessState.blocked(asVoid(this.lookupSourceProvider));
        }
        if (!this.joinedSourcePages.isFinished()) {
            return WorkProcessor.ProcessState.ofResult(this.joinedSourcePages);
        }
        if (this.partitionedConsumption == null) {
            this.partitionedConsumption = this.lookupSourceFactory.finishProbeOperator(this.lookupJoinsCount);
            return WorkProcessor.ProcessState.blocked(asVoid(this.partitionedConsumption));
        }
        if (this.lookupPartitions == null) {
            this.lookupPartitions = ((PartitionedConsumption) MoreFutures.getDone(this.partitionedConsumption)).beginConsumption();
        }
        if (this.previousPartition != null) {
            if (!this.previousPartitionLookupSource.isDone()) {
                return WorkProcessor.ProcessState.blocked(asVoid(this.previousPartitionLookupSource));
            }
            this.previousPartition.release();
            this.previousPartition = null;
            this.previousPartitionLookupSource = null;
        }
        if (!this.lookupPartitions.hasNext()) {
            close();
            return WorkProcessor.ProcessState.finished();
        }
        PartitionedConsumption.Partition<Supplier<LookupSource>> next = this.lookupPartitions.next();
        this.previousPartition = next;
        this.previousPartitionLookupSource = next.load();
        return WorkProcessor.ProcessState.ofResult(joinUnspilledPages(next));
    }

    private static <T> ListenableFuture<Void> asVoid(ListenableFuture<T> listenableFuture) {
        return Futures.transform(listenableFuture, obj -> {
            return null;
        }, MoreExecutors.directExecutor());
    }

    private WorkProcessor<Page> joinUnspilledPages(PartitionedConsumption.Partition<Supplier<LookupSource>> partition) {
        int number = partition.number();
        WorkProcessor fromIterator = WorkProcessor.fromIterator((Iterator) this.sourcePagesJoiner.getSpiller().map(partitioningSpiller -> {
            return partitioningSpiller.getSpilledPages(number);
        }).orElse(Collections.emptyIterator()));
        Iterator<DefaultPageJoiner.SavedRow> it = (Iterator) Optional.ofNullable(this.sourcePagesJoiner.getSpilledRows().remove(Integer.valueOf(number))).map(savedRow -> {
            return Iterators.singletonIterator(savedRow);
        }).orElse(Collections.emptyIterator());
        return fromIterator.transform(this.pageJoinerFactory.getPageJoiner(Futures.transform(partition.load(), supplier -> {
            return new StaticLookupSourceProvider((LookupSource) supplier.get());
        }, MoreExecutors.directExecutor()), Optional.empty(), it));
    }
}
