package io.trino.plugin.exchange.hdfs;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.InputStreamSliceInput;
import io.airlift.slice.SizeOf;
import io.airlift.slice.Slice;
import io.trino.hadoop.ConfigurationInstantiator;
import io.trino.plugin.exchange.filesystem.ExchangeSourceFile;
import io.trino.plugin.exchange.filesystem.ExchangeStorageReader;
import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
import io.trino.plugin.exchange.filesystem.FileStatus;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

/* loaded from: input_file:io/trino/plugin/exchange/hdfs/HadoopFileSystemExchangeStorage.class */
public class HadoopFileSystemExchangeStorage implements FileSystemExchangeStorage {
    private final int blockSize;
    private final FileSystem fileSystem;

    @ThreadSafe
    /* loaded from: input_file:io/trino/plugin/exchange/hdfs/HadoopFileSystemExchangeStorage$HadoopExchangeStorageReader.class */
    private static class HadoopExchangeStorageReader implements ExchangeStorageReader {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(HadoopExchangeStorageReader.class);
        private final FileSystem fileSystem;

        @GuardedBy("this")
        private final Queue<ExchangeSourceFile> sourceFiles;
        private final int blockSize;

        @GuardedBy("this")
        private InputStreamSliceInput sliceInput;

        @GuardedBy("this")
        private boolean closed;

        public HadoopExchangeStorageReader(FileSystem fileSystem, List<ExchangeSourceFile> list, int i) {
            this.fileSystem = (FileSystem) Objects.requireNonNull(fileSystem, "fileSystem is null");
            this.sourceFiles = new ArrayDeque((Collection) Objects.requireNonNull(list, "sourceFiles is null"));
            this.blockSize = i;
        }

        public synchronized Slice read() throws IOException {
            if (this.closed) {
                return null;
            }
            if (this.sliceInput != null && this.sliceInput.isReadable()) {
                return this.sliceInput.readSlice(this.sliceInput.readInt());
            }
            ExchangeSourceFile poll = this.sourceFiles.poll();
            if (poll == null) {
                close();
                return null;
            }
            this.sliceInput = getSliceInput(poll);
            return this.sliceInput.readSlice(this.sliceInput.readInt());
        }

        public ListenableFuture<Void> isBlocked() {
            return Futures.immediateVoidFuture();
        }

        public synchronized long getRetainedSize() {
            return INSTANCE_SIZE + (this.sliceInput == null ? 0L : this.sliceInput.getRetainedSize());
        }

        public synchronized boolean isFinished() {
            return this.closed;
        }

        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.sliceInput != null) {
                this.sliceInput.close();
                this.sliceInput = null;
            }
        }

        private InputStreamSliceInput getSliceInput(ExchangeSourceFile exchangeSourceFile) throws IOException {
            return new InputStreamSliceInput(this.fileSystem.open(new Path(exchangeSourceFile.getFileUri())), this.blockSize);
        }
    }

    @NotThreadSafe
    /* loaded from: input_file:io/trino/plugin/exchange/hdfs/HadoopFileSystemExchangeStorage$HadoopExchangeStorageWriter.class */
    private static class HadoopExchangeStorageWriter implements ExchangeStorageWriter {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(HadoopExchangeStorageReader.class);
        private final OutputStream outputStream;

        public HadoopExchangeStorageWriter(FileSystem fileSystem, URI uri) {
            try {
                this.outputStream = fileSystem.create(new Path(uri), true);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        public ListenableFuture<Void> write(Slice slice) {
            try {
                this.outputStream.write(slice.getBytes());
                return Futures.immediateVoidFuture();
            } catch (IOException | RuntimeException e) {
                return Futures.immediateFailedFuture(e);
            }
        }

        public ListenableFuture<Void> finish() {
            try {
                this.outputStream.close();
                return Futures.immediateVoidFuture();
            } catch (IOException | RuntimeException e) {
                return Futures.immediateFailedFuture(e);
            }
        }

        public ListenableFuture<Void> abort() {
            try {
                this.outputStream.close();
                return Futures.immediateVoidFuture();
            } catch (IOException | RuntimeException e) {
                return Futures.immediateFailedFuture(e);
            }
        }

        public long getRetainedSize() {
            return INSTANCE_SIZE;
        }
    }

    @Inject
    public HadoopFileSystemExchangeStorage(ExchangeHdfsConfig exchangeHdfsConfig) throws IOException {
        Configuration newEmptyConfiguration = ConfigurationInstantiator.newEmptyConfiguration();
        for (File file : exchangeHdfsConfig.getResourceConfigFiles()) {
            Preconditions.checkArgument(file.exists(), "File does not exist: %s", file);
            newEmptyConfiguration.addResource(new Path(file.getPath()));
        }
        this.fileSystem = FileSystem.get(newEmptyConfiguration);
        this.blockSize = Math.toIntExact(exchangeHdfsConfig.getHdfsStorageBlockSize().toBytes());
    }

    public void createDirectories(URI uri) throws IOException {
        this.fileSystem.mkdirs(new Path(uri));
    }

    public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> list, int i) {
        return new HadoopExchangeStorageReader(this.fileSystem, list, this.blockSize);
    }

    public ExchangeStorageWriter createExchangeStorageWriter(URI uri) {
        return new HadoopExchangeStorageWriter(this.fileSystem, uri);
    }

    public ListenableFuture<Void> createEmptyFile(URI uri) {
        try {
            this.fileSystem.createNewFile(new Path(uri));
            return Futures.immediateVoidFuture();
        } catch (IOException e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    public ListenableFuture<Void> deleteRecursively(List<URI> list) {
        Iterator<URI> it = list.iterator();
        while (it.hasNext()) {
            try {
                this.fileSystem.delete(new Path(it.next()), true);
            } catch (IOException | RuntimeException e) {
                return Futures.immediateFailedFuture(e);
            }
        }
        return Futures.immediateVoidFuture();
    }

    public ListenableFuture<List<FileStatus>> listFilesRecursively(URI uri) {
        ImmutableList.Builder builder = ImmutableList.builder();
        try {
            RemoteIterator listFiles = this.fileSystem.listFiles(new Path(uri), true);
            while (listFiles.hasNext()) {
                LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
                builder.add(new FileStatus(locatedFileStatus.getPath().toString(), locatedFileStatus.getLen()));
            }
            return Futures.immediateFuture(builder.build());
        } catch (IOException e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    public int getWriteBufferSize() {
        return this.blockSize;
    }

    public void close() {
    }
}
