package io.trino.execution.buffer;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.StateMachine;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.spi.exchange.ExchangeSink;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
/* loaded from: input_file:io/trino/execution/buffer/SpoolingExchangeOutputBuffer.class */
public class SpoolingExchangeOutputBuffer implements OutputBuffer {
    private static final Logger log = Logger.get(SpoolingExchangeOutputBuffer.class);
    private final OutputBufferStateMachine stateMachine;
    private final OutputBuffers outputBuffers;
    private ExchangeSink exchangeSink;
    private final Supplier<LocalMemoryContext> memoryContextSupplier;
    private final AtomicLong peakMemoryUsage = new AtomicLong();
    private final AtomicLong totalPagesAdded = new AtomicLong();
    private final AtomicLong totalRowsAdded = new AtomicLong();

    public SpoolingExchangeOutputBuffer(OutputBufferStateMachine outputBufferStateMachine, OutputBuffers outputBuffers, ExchangeSink exchangeSink, Supplier<LocalMemoryContext> supplier) {
        this.stateMachine = (OutputBufferStateMachine) Objects.requireNonNull(outputBufferStateMachine, "stateMachine is null");
        this.outputBuffers = (OutputBuffers) Objects.requireNonNull(outputBuffers, "outputBuffers is null");
        Preconditions.checkArgument(outputBuffers.getType() == OutputBuffers.BufferType.SPOOL, "Expected a SPOOL output buffer");
        this.exchangeSink = (ExchangeSink) Objects.requireNonNull(exchangeSink, "exchangeSink is null");
        this.memoryContextSupplier = (Supplier) Objects.requireNonNull(supplier, "memoryContextSupplier is null");
        outputBufferStateMachine.noMoreBuffers();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public OutputBufferInfo getInfo() {
        BufferState state = this.stateMachine.getState();
        LocalMemoryContext systemMemoryContextOrNull = getSystemMemoryContextOrNull();
        return new OutputBufferInfo("EXTERNAL", state, false, state.canAddPages(), systemMemoryContextOrNull == null ? 0L : systemMemoryContextOrNull.getBytes(), this.totalPagesAdded.get(), this.totalRowsAdded.get(), this.totalPagesAdded.get(), ImmutableList.of());
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public BufferState getState() {
        return this.stateMachine.getState();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public double getUtilization() {
        return 0.0d;
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public boolean isOverutilized() {
        return false;
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void addStateChangeListener(StateMachine.StateChangeListener<BufferState> stateChangeListener) {
        this.stateMachine.addStateChangeListener(stateChangeListener);
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void setOutputBuffers(OutputBuffers outputBuffers) {
        Objects.requireNonNull(outputBuffers, "newOutputBuffers is null");
        if (this.stateMachine.getState().isTerminal() || this.outputBuffers.getVersion() >= outputBuffers.getVersion()) {
            return;
        }
        this.outputBuffers.checkValidTransition(outputBuffers);
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public ListenableFuture<BufferResult> get(OutputBuffers.OutputBufferId outputBufferId, long j, DataSize dataSize) {
        throw new UnsupportedOperationException();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void acknowledge(OutputBuffers.OutputBufferId outputBufferId, long j) {
        throw new UnsupportedOperationException();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void destroy(OutputBuffers.OutputBufferId outputBufferId) {
        throw new UnsupportedOperationException();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public ListenableFuture<Void> isFull() {
        ExchangeSink exchangeSink = this.exchangeSink;
        return exchangeSink != null ? MoreFutures.toListenableFuture(exchangeSink.isBlocked()) : Futures.immediateVoidFuture();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void enqueue(List<Slice> list) {
        enqueue(0, list);
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void enqueue(int i, List<Slice> list) {
        Objects.requireNonNull(list, "pages is null");
        if (this.stateMachine.getState().canAddPages()) {
            ExchangeSink exchangeSink = this.exchangeSink;
            Preconditions.checkState(exchangeSink != null, "exchangeSink is null");
            Iterator<Slice> it = list.iterator();
            while (it.hasNext()) {
                exchangeSink.add(i, it.next());
                this.totalRowsAdded.addAndGet(PagesSerde.getSerializedPagePositionCount(r0));
            }
            updateMemoryUsage(exchangeSink.getMemoryUsage());
            this.totalPagesAdded.addAndGet(list.size());
        }
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void setNoMorePages() {
        ExchangeSink exchangeSink;
        if (this.stateMachine.noMorePages() && (exchangeSink = this.exchangeSink) != null) {
            exchangeSink.finish().whenComplete((r5, th) -> {
                if (th != null) {
                    this.stateMachine.fail(th);
                } else {
                    this.stateMachine.finish();
                }
                this.exchangeSink = null;
                updateMemoryUsage(0L);
            });
        }
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void destroy() {
        abort();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void abort() {
        ExchangeSink exchangeSink;
        if (this.stateMachine.abort() && (exchangeSink = this.exchangeSink) != null) {
            exchangeSink.abort().whenComplete((r5, th) -> {
                if (th != null) {
                    log.warn(th, "Error aborting exchange sink");
                }
                this.exchangeSink = null;
                updateMemoryUsage(0L);
            });
        }
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public long getPeakMemoryUsage() {
        return this.peakMemoryUsage.get();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public Optional<Throwable> getFailureCause() {
        return this.stateMachine.getFailureCause();
    }

    private void updateMemoryUsage(long j) {
        LocalMemoryContext systemMemoryContextOrNull = getSystemMemoryContextOrNull();
        if (systemMemoryContextOrNull != null) {
            systemMemoryContextOrNull.setBytes(j);
        }
        updatePeakMemoryUsage(j);
    }

    private void updatePeakMemoryUsage(long j) {
        long j2;
        do {
            j2 = this.peakMemoryUsage.get();
            if (j2 >= j) {
                return;
            }
        } while (!this.peakMemoryUsage.compareAndSet(j2, j));
    }

    private LocalMemoryContext getSystemMemoryContextOrNull() {
        try {
            return this.memoryContextSupplier.get();
        } catch (RuntimeException e) {
            return null;
        }
    }
}
