package io.trino.plugin.exchange.filesystem;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.AsyncSemaphore;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceSplitter;
import io.trino.spi.exchange.ExchangeSourceStatistics;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.concurrent.GuardedBy;
import javax.crypto.SecretKey;

/* loaded from: input_file:io/trino/plugin/exchange/filesystem/FileSystemExchange.class */
public class FileSystemExchange implements Exchange {
    private static final Pattern PARTITION_FILE_NAME_PATTERN = Pattern.compile("(\\d+)_(\\d+)\\.data");
    private static final char[] RANDOMIZED_HEX_PREFIX_ALPHABET = "abcdef0123456789".toCharArray();
    private static final int RANDOMIZED_HEX_PREFIX_LENGTH = 6;
    private final List<URI> baseDirectories;
    private final FileSystemExchangeStorage exchangeStorage;
    private final FileSystemExchangeStats stats;
    private final ExchangeContext exchangeContext;
    private final int outputPartitionCount;
    private final boolean preserveOrderWithinPartition;
    private final int fileListingParallelism;
    private final Optional<SecretKey> secretKey;
    private final ExecutorService executor;

    @GuardedBy("this")
    private boolean noMoreSinks;

    @GuardedBy("this")
    private boolean exchangeSourceHandlesCreationStarted;
    private final Map<Integer, String> randomizedPrefixes = new ConcurrentHashMap();

    @GuardedBy("this")
    private final Set<Integer> allSinks = new HashSet();

    @GuardedBy("this")
    private final Set<Integer> finishedSinks = new HashSet();
    private final CompletableFuture<List<ExchangeSourceHandle>> exchangeSourceHandlesFuture = new CompletableFuture<>();

    public FileSystemExchange(List<URI> list, FileSystemExchangeStorage fileSystemExchangeStorage, FileSystemExchangeStats fileSystemExchangeStats, ExchangeContext exchangeContext, int i, boolean z, int i2, Optional<SecretKey> optional, ExecutorService executorService) {
        ArrayList arrayList = new ArrayList((Collection) Objects.requireNonNull(list, "baseDirectories is null"));
        Collections.shuffle(arrayList);
        this.baseDirectories = ImmutableList.copyOf(arrayList);
        this.exchangeStorage = (FileSystemExchangeStorage) Objects.requireNonNull(fileSystemExchangeStorage, "exchangeStorage is null");
        this.stats = (FileSystemExchangeStats) Objects.requireNonNull(fileSystemExchangeStats, "stats is null");
        this.exchangeContext = (ExchangeContext) Objects.requireNonNull(exchangeContext, "exchangeContext is null");
        this.outputPartitionCount = i;
        this.preserveOrderWithinPartition = z;
        this.fileListingParallelism = i2;
        this.secretKey = (Optional) Objects.requireNonNull(optional, "secretKey is null");
        this.executor = (ExecutorService) Objects.requireNonNull(executorService, "executor is null");
    }

    public synchronized ExchangeSinkHandle addSink(int i) {
        FileSystemExchangeSinkHandle fileSystemExchangeSinkHandle = new FileSystemExchangeSinkHandle(i, this.secretKey.map((v0) -> {
            return v0.getEncoded();
        }));
        this.allSinks.add(Integer.valueOf(i));
        return fileSystemExchangeSinkHandle;
    }

    public void noMoreSinks() {
        synchronized (this) {
            this.noMoreSinks = true;
        }
        checkInputReady();
    }

    public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle exchangeSinkHandle, int i) {
        FileSystemExchangeSinkHandle fileSystemExchangeSinkHandle = (FileSystemExchangeSinkHandle) exchangeSinkHandle;
        URI resolve = getTaskOutputDirectory(fileSystemExchangeSinkHandle.getPartitionId()).resolve(i + "/");
        try {
            this.exchangeStorage.createDirectories(resolve);
            return new FileSystemExchangeSinkInstanceHandle(fileSystemExchangeSinkHandle, resolve, this.outputPartitionCount, this.preserveOrderWithinPartition);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void sinkFinished(ExchangeSinkInstanceHandle exchangeSinkInstanceHandle) {
        synchronized (this) {
            this.finishedSinks.add(Integer.valueOf(((FileSystemExchangeSinkInstanceHandle) exchangeSinkInstanceHandle).getSinkHandle().getPartitionId()));
        }
        checkInputReady();
    }

    private void checkInputReady() {
        Verify.verify(!Thread.holdsLock(this));
        ListenableFuture listenableFuture = null;
        synchronized (this) {
            if (this.exchangeSourceHandlesCreationStarted) {
                return;
            }
            if (this.noMoreSinks && this.finishedSinks.containsAll(this.allSinks)) {
                this.exchangeSourceHandlesCreationStarted = true;
                listenableFuture = (ListenableFuture) this.stats.getCreateExchangeSourceHandles().record(this::createExchangeSourceHandles);
                this.exchangeSourceHandlesFuture.whenComplete((list, th) -> {
                    if (this.exchangeSourceHandlesFuture.isCancelled()) {
                        this.exchangeSourceHandlesFuture.cancel(true);
                    }
                });
            }
            if (listenableFuture != null) {
                Futures.addCallback(listenableFuture, new FutureCallback<List<ExchangeSourceHandle>>() { // from class: io.trino.plugin.exchange.filesystem.FileSystemExchange.1
                    public void onSuccess(List<ExchangeSourceHandle> list2) {
                        FileSystemExchange.this.exchangeSourceHandlesFuture.complete(list2);
                    }

                    public void onFailure(Throwable th2) {
                        FileSystemExchange.this.exchangeSourceHandlesFuture.completeExceptionally(th2);
                    }
                }, MoreExecutors.directExecutor());
            }
        }
    }

    private ListenableFuture<List<ExchangeSourceHandle>> createExchangeSourceHandles() {
        ImmutableList copyOf;
        synchronized (this) {
            copyOf = ImmutableList.copyOf(this.finishedSinks);
        }
        return Futures.transform(AsyncSemaphore.processAll(copyOf, (v1) -> {
            return getCommittedPartitions(v1);
        }, this.fileListingParallelism, this.executor), list -> {
            ArrayListMultimap create = ArrayListMultimap.create();
            list.forEach(multimap -> {
                Objects.requireNonNull(create);
                multimap.forEach((v1, v2) -> {
                    r1.put(v1, v2);
                });
            });
            ImmutableList.Builder builder = ImmutableList.builder();
            for (Integer num : create.keySet()) {
                builder.add(new FileSystemExchangeSourceHandle(num.intValue(), ImmutableList.copyOf(create.get(num)), this.secretKey.map((v0) -> {
                    return v0.getEncoded();
                })));
            }
            return builder.build();
        }, this.executor);
    }

    private ListenableFuture<Multimap<Integer, FileStatus>> getCommittedPartitions(int i) {
        URI taskOutputDirectory = getTaskOutputDirectory(i);
        return this.stats.getGetCommittedPartitions().record(Futures.transform(this.exchangeStorage.listFilesRecursively(taskOutputDirectory), list -> {
            String str = (String) list.stream().map((v0) -> {
                return v0.getFilePath();
            }).filter(str2 -> {
                return str2.endsWith(FileSystemExchangeSink.COMMITTED_MARKER_FILE_NAME);
            }).findFirst().orElseThrow(() -> {
                return new IllegalStateException(String.format("No committed attempts found under sink output path %s", taskOutputDirectory));
            });
            String[] split = str.split(FileSystemExchangeManager.PATH_SEPARATOR);
            Preconditions.checkState(split.length >= 3, "committedMarkerFilePath %s is malformed", str);
            String str3 = split[split.length - 2];
            int length = ((str.length() - str3.length()) - FileSystemExchangeManager.PATH_SEPARATOR.length()) - FileSystemExchangeSink.COMMITTED_MARKER_FILE_NAME.length();
            List<FileStatus> list = (List) list.stream().filter(fileStatus -> {
                return fileStatus.getFilePath().startsWith(str3 + "/", length) && fileStatus.getFilePath().endsWith(FileSystemExchangeSink.DATA_FILE_SUFFIX);
            }).collect(ImmutableList.toImmutableList());
            ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
            for (FileStatus fileStatus2 : list) {
                Matcher matcher = PARTITION_FILE_NAME_PATTERN.matcher(new File(fileStatus2.getFilePath()).getName());
                Preconditions.checkState(matcher.matches(), "Unexpected partition file: %s", fileStatus2);
                builder.put(Integer.valueOf(Integer.parseInt(matcher.group(1))), fileStatus2);
            }
            return builder.build();
        }, this.executor));
    }

    private URI getTaskOutputDirectory(int i) {
        return this.baseDirectories.get(i % this.baseDirectories.size()).resolve(this.randomizedPrefixes.computeIfAbsent(Integer.valueOf(i), num -> {
            return generateRandomizedHexPrefix();
        }) + "." + this.exchangeContext.getQueryId() + "." + this.exchangeContext.getExchangeId() + "." + i + "/");
    }

    public CompletableFuture<List<ExchangeSourceHandle>> getSourceHandles() {
        return this.exchangeSourceHandlesFuture;
    }

    public ExchangeSourceSplitter split(ExchangeSourceHandle exchangeSourceHandle, long j) {
        final FileSystemExchangeSourceHandle fileSystemExchangeSourceHandle = (FileSystemExchangeSourceHandle) exchangeSourceHandle;
        final Iterator<FileStatus> it = fileSystemExchangeSourceHandle.getFiles().iterator();
        return new ExchangeSourceSplitter() { // from class: io.trino.plugin.exchange.filesystem.FileSystemExchange.2
            public CompletableFuture<Void> isBlocked() {
                return CompletableFuture.completedFuture(null);
            }

            public Optional<ExchangeSourceHandle> getNext() {
                return it.hasNext() ? Optional.of(new FileSystemExchangeSourceHandle(fileSystemExchangeSourceHandle.getPartitionId(), ImmutableList.of((FileStatus) it.next()), FileSystemExchange.this.secretKey.map((v0) -> {
                    return v0.getEncoded();
                }))) : Optional.empty();
            }

            public void close() {
            }
        };
    }

    public ExchangeSourceStatistics getExchangeSourceStatistics(ExchangeSourceHandle exchangeSourceHandle) {
        return new ExchangeSourceStatistics(((FileSystemExchangeSourceHandle) exchangeSourceHandle).getFiles().stream().mapToLong((v0) -> {
            return v0.getFileSize();
        }).sum());
    }

    public void close() {
        this.stats.getCloseExchange().record(this.exchangeStorage.deleteRecursively((List) this.allSinks.stream().map((v1) -> {
            return getTaskOutputDirectory(v1);
        }).collect(ImmutableList.toImmutableList())));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String generateRandomizedHexPrefix() {
        char[] cArr = new char[RANDOMIZED_HEX_PREFIX_LENGTH];
        for (int i = 0; i < cArr.length; i++) {
            cArr[i] = RANDOMIZED_HEX_PREFIX_ALPHABET[ThreadLocalRandom.current().nextInt(RANDOMIZED_HEX_PREFIX_ALPHABET.length)];
        }
        return new String(cArr);
    }
}
