package io.trino.plugin.raptor.legacy.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.plugin.raptor.legacy.RaptorErrorCode;
import io.trino.plugin.raptor.legacy.backup.BackupStore;
import io.trino.plugin.raptor.legacy.metadata.ShardManager;
import io.trino.plugin.raptor.legacy.metadata.ShardMetadata;
import io.trino.plugin.raptor.legacy.util.PrioritizedFifoExecutor;
import io.trino.spi.NodeManager;
import io.trino.spi.TrinoException;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;

/* loaded from: input_file:io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager.class */
public class ShardRecoveryManager {
    private static final Logger log = Logger.get(ShardRecoveryManager.class);
    private final StorageService storageService;
    private final Optional<BackupStore> backupStore;
    private final String nodeIdentifier;
    private final ShardManager shardManager;
    private final Duration missingShardDiscoveryInterval;
    private final AtomicBoolean started;
    private final MissingShardsQueue shardQueue;
    private final ScheduledExecutorService missingShardExecutor;
    private final ExecutorService executorService;
    private final ShardRecoveryStats stats;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager$MissingShard.class */
    public static final class MissingShard {
        private final UUID shardUuid;
        private final long shardSize;
        private final OptionalLong shardXxhash64;
        private final boolean active;

        public MissingShard(UUID uuid, long j, OptionalLong optionalLong, boolean z) {
            this.shardUuid = (UUID) Objects.requireNonNull(uuid, "shardUuid is null");
            this.shardSize = j;
            this.shardXxhash64 = (OptionalLong) Objects.requireNonNull(optionalLong, "shardXxhash64 is null");
            this.active = z;
        }

        public UUID getShardUuid() {
            return this.shardUuid;
        }

        public long getShardSize() {
            return this.shardSize;
        }

        public OptionalLong getShardXxhash64() {
            return this.shardXxhash64;
        }

        public boolean isActive() {
            return this.active;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            MissingShard missingShard = (MissingShard) obj;
            return Objects.equals(Boolean.valueOf(this.active), Boolean.valueOf(missingShard.active)) && Objects.equals(this.shardUuid, missingShard.shardUuid);
        }

        public int hashCode() {
            return Objects.hash(this.shardUuid, Boolean.valueOf(this.active));
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("shardUuid", this.shardUuid).add("active", this.active).toString();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager$MissingShardComparator.class */
    static class MissingShardComparator implements Comparator<MissingShardRunnable> {
        MissingShardComparator() {
        }

        @Override // java.util.Comparator
        public int compare(MissingShardRunnable missingShardRunnable, MissingShardRunnable missingShardRunnable2) {
            if (missingShardRunnable.isActive() == missingShardRunnable2.isActive()) {
                return 0;
            }
            return missingShardRunnable.isActive() ? -1 : 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager$MissingShardRecovery.class */
    public class MissingShardRecovery implements MissingShardRunnable {
        private final UUID shardUuid;
        private final long shardSize;
        private final OptionalLong shardXxhash64;
        private final boolean active;

        public MissingShardRecovery(UUID uuid, long j, OptionalLong optionalLong, boolean z) {
            this.shardUuid = (UUID) Objects.requireNonNull(uuid, "shardUuid is null");
            this.shardSize = j;
            this.shardXxhash64 = (OptionalLong) Objects.requireNonNull(optionalLong, "shardXxhash64 is null");
            this.active = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            ShardRecoveryManager.this.restoreFromBackup(this.shardUuid, this.shardSize, this.shardXxhash64);
        }

        @Override // io.trino.plugin.raptor.legacy.storage.ShardRecoveryManager.MissingShardRunnable
        public boolean isActive() {
            return this.active;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager$MissingShardRunnable.class */
    public interface MissingShardRunnable extends Runnable {
        boolean isActive();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/raptor/legacy/storage/ShardRecoveryManager$MissingShardsQueue.class */
    public class MissingShardsQueue {
        private final LoadingCache<MissingShard, ListenableFuture<Void>> queuedMissingShards;

        public MissingShardsQueue(final PrioritizedFifoExecutor<MissingShardRunnable> prioritizedFifoExecutor) {
            Objects.requireNonNull(prioritizedFifoExecutor, "shardRecoveryExecutor is null");
            this.queuedMissingShards = CacheBuilder.newBuilder().build(new CacheLoader<MissingShard, ListenableFuture<Void>>() { // from class: io.trino.plugin.raptor.legacy.storage.ShardRecoveryManager.MissingShardsQueue.1
                public ListenableFuture<Void> load(MissingShard missingShard) {
                    ListenableFuture<Void> submit = prioritizedFifoExecutor.submit(new MissingShardRecovery(missingShard.getShardUuid(), missingShard.getShardSize(), missingShard.getShardXxhash64(), missingShard.isActive()));
                    submit.addListener(() -> {
                        MissingShardsQueue.this.queuedMissingShards.invalidate(missingShard);
                    }, MoreExecutors.directExecutor());
                    return submit;
                }
            });
        }

        public ListenableFuture<Void> submit(MissingShard missingShard) throws ExecutionException {
            return (ListenableFuture) this.queuedMissingShards.get(missingShard);
        }
    }

    @Inject
    public ShardRecoveryManager(StorageService storageService, Optional<BackupStore> optional, NodeManager nodeManager, ShardManager shardManager, StorageManagerConfig storageManagerConfig) {
        this(storageService, optional, nodeManager, shardManager, storageManagerConfig.getMissingShardDiscoveryInterval(), storageManagerConfig.getRecoveryThreads());
    }

    public ShardRecoveryManager(StorageService storageService, Optional<BackupStore> optional, NodeManager nodeManager, ShardManager shardManager, Duration duration, int i) {
        this.started = new AtomicBoolean();
        this.missingShardExecutor = Executors.newScheduledThreadPool(1, Threads.daemonThreadsNamed("missing-shard-discovery"));
        this.executorService = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("shard-recovery-%s"));
        this.storageService = (StorageService) Objects.requireNonNull(storageService, "storageService is null");
        this.backupStore = (Optional) Objects.requireNonNull(optional, "backupStore is null");
        this.nodeIdentifier = ((NodeManager) Objects.requireNonNull(nodeManager, "nodeManager is null")).getCurrentNode().getNodeIdentifier();
        this.shardManager = (ShardManager) Objects.requireNonNull(shardManager, "shardManager is null");
        this.missingShardDiscoveryInterval = (Duration) Objects.requireNonNull(duration, "missingShardDiscoveryInterval is null");
        this.shardQueue = new MissingShardsQueue(new PrioritizedFifoExecutor(this.executorService, i, new MissingShardComparator()));
        this.stats = new ShardRecoveryStats();
    }

    @PostConstruct
    public void start() {
        if (!this.backupStore.isEmpty() && this.started.compareAndSet(false, true)) {
            scheduleRecoverMissingShards();
        }
    }

    @PreDestroy
    public void shutdown() {
        this.executorService.shutdownNow();
        this.missingShardExecutor.shutdownNow();
    }

    private void scheduleRecoverMissingShards() {
        this.missingShardExecutor.scheduleWithFixedDelay(() -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextLong(1L, this.missingShardDiscoveryInterval.roundTo(TimeUnit.SECONDS)));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            enqueueMissingShards();
        }, 0L, this.missingShardDiscoveryInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Managed
    public void recoverMissingShards() {
        this.missingShardExecutor.submit(this::enqueueMissingShards);
    }

    private synchronized void enqueueMissingShards() {
        try {
            for (ShardMetadata shardMetadata : getMissingShards()) {
                this.stats.incrementBackgroundShardRecovery();
                MoreFutures.addExceptionCallback(this.shardQueue.submit(new MissingShard(shardMetadata.getShardUuid(), shardMetadata.getCompressedSize(), shardMetadata.getXxhash64(), false)), th -> {
                    log.warn(th, "Error recovering shard: %s", new Object[]{shardMetadata.getShardUuid()});
                });
            }
        } catch (Throwable th2) {
            log.error(th2, "Error creating shard recovery tasks");
        }
    }

    private Set<ShardMetadata> getMissingShards() {
        return (Set) this.shardManager.getNodeShards(this.nodeIdentifier).stream().filter(shardMetadata -> {
            return shardNeedsRecovery(shardMetadata.getShardUuid(), shardMetadata.getCompressedSize());
        }).collect(Collectors.toSet());
    }

    private boolean shardNeedsRecovery(UUID uuid, long j) {
        File storageFile = this.storageService.getStorageFile(uuid);
        return (storageFile.exists() && storageFile.length() == j) ? false : true;
    }

    public Future<Void> recoverShard(UUID uuid) throws ExecutionException {
        ShardMetadata shard = this.shardManager.getShard(uuid);
        if (shard == null) {
            throw new TrinoException(RaptorErrorCode.RAPTOR_ERROR, "Shard does not exist in database: " + uuid);
        }
        this.stats.incrementActiveShardRecovery();
        return this.shardQueue.submit(new MissingShard(uuid, shard.getCompressedSize(), shard.getXxhash64(), true));
    }

    @VisibleForTesting
    void restoreFromBackup(UUID uuid, long j, OptionalLong optionalLong) {
        File storageFile = this.storageService.getStorageFile(uuid);
        if (!this.backupStore.get().shardExists(uuid)) {
            this.stats.incrementShardRecoveryBackupNotFound();
            throw new TrinoException(RaptorErrorCode.RAPTOR_RECOVERY_ERROR, "No backup file found for shard: " + uuid);
        }
        if (storageFile.exists()) {
            if (!isFileCorrupt(storageFile, j, optionalLong)) {
                return;
            }
            this.stats.incrementCorruptLocalFile();
            quarantineFile(uuid, storageFile, "Local file is corrupt.");
        }
        File temporarySuffix = temporarySuffix(this.storageService.getStagingFile(uuid));
        this.storageService.createParents(temporarySuffix);
        log.info("Copying shard %s from backup...", new Object[]{uuid});
        long nanoTime = System.nanoTime();
        try {
            this.backupStore.get().restoreShard(uuid, temporarySuffix);
            Duration nanosSince = Duration.nanosSince(nanoTime);
            DataSize succinctBytes = DataSize.succinctBytes(temporarySuffix.length());
            DataSize succinct = dataRate(succinctBytes, nanosSince).succinct();
            this.stats.addShardRecoveryDataRate(succinct, succinctBytes, nanosSince);
            log.info("Copied shard %s from backup in %s (%s at %s/s)", new Object[]{uuid, nanosSince, succinctBytes, succinct});
            this.storageService.createParents(storageFile);
            try {
                try {
                    Files.move(temporarySuffix.toPath(), storageFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
                    temporarySuffix.delete();
                } catch (FileAlreadyExistsException e) {
                    temporarySuffix.delete();
                } catch (IOException e2) {
                    this.stats.incrementShardRecoveryFailure();
                    throw new TrinoException(RaptorErrorCode.RAPTOR_RECOVERY_ERROR, "Failed to move shard: " + uuid, e2);
                }
                if (!storageFile.exists()) {
                    this.stats.incrementShardRecoveryFailure();
                    throw new TrinoException(RaptorErrorCode.RAPTOR_RECOVERY_ERROR, "File does not exist after recovery: " + uuid);
                }
                if (!isFileCorrupt(storageFile, j, optionalLong)) {
                    this.stats.incrementShardRecoverySuccess();
                    return;
                }
                this.stats.incrementShardRecoveryFailure();
                this.stats.incrementCorruptRecoveredFile();
                quarantineFile(uuid, storageFile, "Local file is corrupt after recovery.");
                throw new TrinoException(RaptorErrorCode.RAPTOR_BACKUP_CORRUPTION, "Backup is corrupt after read: " + uuid);
            } catch (Throwable th) {
                temporarySuffix.delete();
                throw th;
            }
        } catch (TrinoException e3) {
            this.stats.incrementShardRecoveryFailure();
            temporarySuffix.delete();
            throw e3;
        }
    }

    private void quarantineFile(UUID uuid, File file, String str) {
        File file2 = new File(this.storageService.getQuarantineFile(uuid).getPath() + ".corrupt");
        if (file2.exists()) {
            log.warn("%s Quarantine already exists: %s", new Object[]{str, file2});
            return;
        }
        log.error("%s Quarantining corrupt file: %s", new Object[]{str, file2});
        try {
            Files.move(file.toPath(), file2.toPath(), StandardCopyOption.ATOMIC_MOVE);
        } catch (IOException e) {
            log.warn(e, "Quarantine of corrupt file failed: " + file2);
            file.delete();
        }
    }

    private static boolean isFileCorrupt(File file, long j, OptionalLong optionalLong) {
        return file.length() != j || (optionalLong.isPresent() && RaptorStorageManager.xxhash64(file) != optionalLong.getAsLong());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DataSize dataRate(DataSize dataSize, Duration duration) {
        double bytes = dataSize.toBytes() / duration.getValue(TimeUnit.SECONDS);
        if (Double.isNaN(bytes) || Double.isInfinite(bytes)) {
            bytes = 0.0d;
        }
        return DataSize.succinctBytes(Math.round(bytes));
    }

    private static File temporarySuffix(File file) {
        return new File(file.getPath() + ".tmp-" + UUID.randomUUID());
    }

    @Managed
    @Flatten
    public ShardRecoveryStats getStats() {
        return this.stats;
    }

    private static <T> FutureCallback<T> failureCallback(final Consumer<Throwable> consumer) {
        return new FutureCallback<T>() { // from class: io.trino.plugin.raptor.legacy.storage.ShardRecoveryManager.1
            public void onSuccess(T t) {
            }

            public void onFailure(Throwable th) {
                consumer.accept(th);
            }
        };
    }
}
