package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import io.trino.execution.RemoteTask;
import io.trino.execution.TaskStatus;
import io.trino.execution.scheduler.ScheduleResult;
import io.trino.metadata.InternalNode;
import io.trino.spi.StandardErrorCode;
import io.trino.util.Failures;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/* loaded from: input_file:io/trino/execution/scheduler/ScaledWriterScheduler.class */
public class ScaledWriterScheduler implements StageScheduler {
    private final StageExecution stage;
    private final Supplier<Collection<TaskStatus>> sourceTasksProvider;
    private final Supplier<Collection<TaskStatus>> writerTasksProvider;
    private final NodeSelector nodeSelector;
    private final ScheduledExecutorService executor;
    private final long writerMinSizeBytes;
    private final Set<InternalNode> scheduledNodes = new HashSet();
    private final AtomicBoolean done = new AtomicBoolean();
    private volatile SettableFuture<Void> future = SettableFuture.create();

    public ScaledWriterScheduler(StageExecution stageExecution, Supplier<Collection<TaskStatus>> supplier, Supplier<Collection<TaskStatus>> supplier2, NodeSelector nodeSelector, ScheduledExecutorService scheduledExecutorService, DataSize dataSize) {
        this.stage = (StageExecution) Objects.requireNonNull(stageExecution, "stage is null");
        this.sourceTasksProvider = (Supplier) Objects.requireNonNull(supplier, "sourceTasksProvider is null");
        this.writerTasksProvider = (Supplier) Objects.requireNonNull(supplier2, "writerTasksProvider is null");
        this.nodeSelector = (NodeSelector) Objects.requireNonNull(nodeSelector, "nodeSelector is null");
        this.executor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "executor is null");
        this.writerMinSizeBytes = ((DataSize) Objects.requireNonNull(dataSize, "writerMinSize is null")).toBytes();
    }

    public void finish() {
        this.done.set(true);
        this.future.set((Object) null);
    }

    @Override // io.trino.execution.scheduler.StageScheduler
    public ScheduleResult schedule() {
        List<RemoteTask> scheduleTasks = scheduleTasks(getNewTaskCount());
        this.future.set((Object) null);
        this.future = SettableFuture.create();
        this.executor.schedule(() -> {
            return Boolean.valueOf(this.future.set((Object) null));
        }, 200L, TimeUnit.MILLISECONDS);
        return new ScheduleResult(this.done.get(), (Iterable<? extends RemoteTask>) scheduleTasks, (ListenableFuture<Void>) this.future, ScheduleResult.BlockedReason.WRITER_SCALING, 0);
    }

    private int getNewTaskCount() {
        if (this.scheduledNodes.isEmpty()) {
            return 1;
        }
        return (this.sourceTasksProvider.get().stream().filter(taskStatus -> {
            return !taskStatus.getState().isDone();
        }).map((v0) -> {
            return v0.isOutputBufferOverutilized();
        }).mapToDouble(bool -> {
            return bool.booleanValue() ? 1.0d : 0.0d;
        }).average().orElse(0.0d) < 0.5d || this.writerTasksProvider.get().stream().map((v0) -> {
            return v0.getPhysicalWrittenDataSize();
        }).mapToLong((v0) -> {
            return v0.toBytes();
        }).sum() < this.writerMinSizeBytes * ((long) this.scheduledNodes.size())) ? 0 : 1;
    }

    private List<RemoteTask> scheduleTasks(int i) {
        if (i == 0) {
            return ImmutableList.of();
        }
        List<InternalNode> selectRandomNodes = this.nodeSelector.selectRandomNodes(i, this.scheduledNodes);
        Failures.checkCondition((this.scheduledNodes.isEmpty() && selectRandomNodes.isEmpty()) ? false : true, StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query", new Object[0]);
        ImmutableList.Builder builder = ImmutableList.builder();
        for (InternalNode internalNode : selectRandomNodes) {
            this.stage.scheduleTask(internalNode, this.scheduledNodes.size(), ImmutableMultimap.of(), ImmutableMultimap.of()).ifPresent(remoteTask -> {
                builder.add(remoteTask);
                this.scheduledNodes.add(internalNode);
            });
        }
        return builder.build();
    }
}
