package io.trino.plugin.raptor.legacy;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.concurrent.Threads;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.raptor.legacy.backup.BackupService;
import io.trino.plugin.raptor.legacy.metadata.BucketShards;
import io.trino.plugin.raptor.legacy.metadata.ShardManager;
import io.trino.plugin.raptor.legacy.metadata.ShardNodes;
import io.trino.plugin.raptor.legacy.util.SynchronizedResultIterator;
import io.trino.spi.HostAddress;
import io.trino.spi.Node;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;
import jakarta.annotation.PreDestroy;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jdbi.v3.core.result.ResultIterator;

/* loaded from: input_file:io/trino/plugin/raptor/legacy/RaptorSplitManager.class */
public class RaptorSplitManager implements ConnectorSplitManager {
    private final NodeSupplier nodeSupplier;
    private final ShardManager shardManager;
    private final boolean backupAvailable;
    private final ExecutorService executor;

    /* loaded from: input_file:io/trino/plugin/raptor/legacy/RaptorSplitManager$RaptorSplitSource.class */
    private class RaptorSplitSource implements ConnectorSplitSource {
        private final Map<String, Node> nodesById;
        private final long tableId;
        private final Optional<List<String>> bucketToNode;
        private final ResultIterator<BucketShards> iterator;

        @GuardedBy("this")
        private CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> future;

        public RaptorSplitSource(long j, boolean z, TupleDomain<RaptorColumnHandle> tupleDomain, Optional<List<String>> optional) {
            this.nodesById = Maps.uniqueIndex(RaptorSplitManager.this.nodeSupplier.getWorkerNodes(), (v0) -> {
                return v0.getNodeIdentifier();
            });
            this.tableId = j;
            this.bucketToNode = (Optional) Objects.requireNonNull(optional, "bucketToNode is null");
            this.iterator = new SynchronizedResultIterator(optional.isPresent() ? RaptorSplitManager.this.shardManager.getShardNodesBucketed(j, z, optional.get(), tupleDomain) : RaptorSplitManager.this.shardManager.getShardNodes(j, tupleDomain));
        }

        public synchronized CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> getNextBatch(int i) {
            Preconditions.checkState(this.future == null || this.future.isDone(), "previous batch not completed");
            this.future = CompletableFuture.supplyAsync(batchSupplier(i), RaptorSplitManager.this.executor);
            return this.future;
        }

        public synchronized void close() {
            if (this.future != null) {
                this.future.cancel(true);
                this.future = null;
            }
            ExecutorService executorService = RaptorSplitManager.this.executor;
            ResultIterator<BucketShards> resultIterator = this.iterator;
            Objects.requireNonNull(resultIterator);
            executorService.execute(resultIterator::close);
        }

        public boolean isFinished() {
            return !this.iterator.hasNext();
        }

        private Supplier<ConnectorSplitSource.ConnectorSplitBatch> batchSupplier(int i) {
            return () -> {
                ImmutableList.Builder builder = ImmutableList.builder();
                for (int i2 = 0; i2 < i; i2++) {
                    if (Thread.currentThread().isInterrupted()) {
                        throw new RuntimeException("Split batch fetch was interrupted");
                    }
                    if (!this.iterator.hasNext()) {
                        break;
                    }
                    builder.add(createSplit((BucketShards) this.iterator.next()));
                }
                return new ConnectorSplitSource.ConnectorSplitBatch(builder.build(), isFinished());
            };
        }

        private ConnectorSplit createSplit(BucketShards bucketShards) {
            if (bucketShards.getBucketNumber().isPresent()) {
                return createBucketSplit(bucketShards.getBucketNumber().getAsInt(), bucketShards.getShards());
            }
            Verify.verify(bucketShards.getShards().size() == 1, "wrong shard count for non-bucketed table", new Object[0]);
            ShardNodes shardNodes = (ShardNodes) Iterables.getOnlyElement(bucketShards.getShards());
            UUID shardUuid = shardNodes.getShardUuid();
            Set<String> nodeIdentifiers = shardNodes.getNodeIdentifiers();
            List addressesForNodes = RaptorSplitManager.getAddressesForNodes(this.nodesById, nodeIdentifiers);
            if (addressesForNodes.isEmpty()) {
                if (!RaptorSplitManager.this.backupAvailable) {
                    throw new TrinoException(RaptorErrorCode.RAPTOR_NO_HOST_FOR_SHARD, String.format("No host for shard %s found: %s", shardUuid, nodeIdentifiers));
                }
                Set<Node> workerNodes = RaptorSplitManager.this.nodeSupplier.getWorkerNodes();
                if (workerNodes.isEmpty()) {
                    throw new TrinoException(StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query");
                }
                Node node = (Node) RaptorSplitManager.selectRandom(workerNodes);
                RaptorSplitManager.this.shardManager.replaceShardAssignment(this.tableId, shardUuid, node.getNodeIdentifier(), true);
                addressesForNodes = ImmutableList.of(node.getHostAndPort());
            }
            return new RaptorSplit(shardUuid, (List<HostAddress>) addressesForNodes);
        }

        private ConnectorSplit createBucketSplit(int i, Set<ShardNodes> set) {
            String str = this.bucketToNode.get().get(i);
            Node node = this.nodesById.get(str);
            if (node == null) {
                throw new TrinoException(StandardErrorCode.NO_NODES_AVAILABLE, "Node for bucket is offline: " + str);
            }
            return new RaptorSplit((Set<UUID>) set.stream().map((v0) -> {
                return v0.getShardUuid();
            }).collect(Collectors.toSet()), i, node.getHostAndPort());
        }
    }

    @Inject
    public RaptorSplitManager(CatalogName catalogName, NodeSupplier nodeSupplier, ShardManager shardManager, BackupService backupService) {
        this(catalogName, nodeSupplier, shardManager, backupService.isBackupAvailable());
    }

    public RaptorSplitManager(CatalogName catalogName, NodeSupplier nodeSupplier, ShardManager shardManager, boolean z) {
        this.nodeSupplier = (NodeSupplier) Objects.requireNonNull(nodeSupplier, "nodeSupplier is null");
        this.shardManager = (ShardManager) Objects.requireNonNull(shardManager, "shardManager is null");
        this.backupAvailable = z;
        this.executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("raptor-split-" + String.valueOf(catalogName) + "-%s"));
    }

    @PreDestroy
    public void destroy() {
        this.executor.shutdownNow();
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorTableHandle connectorTableHandle, DynamicFilter dynamicFilter, Constraint constraint) {
        RaptorTableHandle raptorTableHandle = (RaptorTableHandle) connectorTableHandle;
        long tableId = raptorTableHandle.getTableId();
        boolean isPresent = raptorTableHandle.getBucketCount().isPresent();
        boolean z = isPresent && raptorTableHandle.getBucketCount().getAsInt() >= RaptorSessionProperties.getOneSplitPerBucketThreshold(connectorSession);
        Optional<List<String>> bucketAssignments = raptorTableHandle.getBucketAssignments();
        Verify.verify(isPresent == bucketAssignments.isPresent(), "mismatched bucketCount and bucketToNode presence", new Object[0]);
        return new RaptorSplitSource(tableId, z, raptorTableHandle.getConstraint(), bucketAssignments);
    }

    private static List<HostAddress> getAddressesForNodes(Map<String, Node> map, Iterable<String> iterable) {
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator<String> it = iterable.iterator();
        while (it.hasNext()) {
            Node node = map.get(it.next());
            if (node != null) {
                builder.add(node.getHostAndPort());
            }
        }
        return builder.build();
    }

    private static <T> T selectRandom(Iterable<T> iterable) {
        ImmutableList copyOf = ImmutableList.copyOf(iterable);
        return (T) copyOf.get(ThreadLocalRandom.current().nextInt(copyOf.size()));
    }
}
