package io.datarouter.bytes.kvfile;

import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.ByteTool;
import io.datarouter.bytes.CountingInputStream;
import io.datarouter.bytes.kvfile.KvFileCompactorFileCache;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/bytes/kvfile/KvFileMerger.class */
public class KvFileMerger {
    private static final Logger logger = LoggerFactory.getLogger(KvFileMerger.class);
    private final KvFileMergerParams params;
    private final KvFileCompactorFileCache.KvFileMergePlan plan;
    private final Supplier<Boolean> shouldStop;
    private final AtomicLong waitForChunksNs = new AtomicLong();
    private final AtomicLong waitForCollatorNs = new AtomicLong();
    private final AtomicLong waitForShouldStopNs = new AtomicLong();
    private final AtomicLong waitForEncoderNs = new AtomicLong();
    private final AtomicLong recordsRead = new AtomicLong();
    private final AtomicLong recordsWritten = new AtomicLong();
    private final AtomicLong recordsWrittenSinceLastLog = new AtomicLong();
    private final AtomicLong blocksRead = new AtomicLong();
    private final AtomicLong blocksWritten = new AtomicLong();
    private final AtomicLong bytesRead = new AtomicLong();
    private final AtomicLong bytesWritten = new AtomicLong();
    private final Instant startTime = Instant.now();
    private final AtomicLong lastLogTimeNs = new AtomicLong(0);

    /* loaded from: input_file:io/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams.class */
    public static final class KvFileMergerByteReaderParams extends Record {
        private final int memoryFanIn;
        private final int streamingFanIn;
        private final Threads makeReadersThreads;
        private final boolean readParallel;
        private final ExecutorService readParallelExec;
        private final ByteLength readBufferSize;
        private final ByteLength chunkSize;

        public KvFileMergerByteReaderParams(int i, int i2, Threads threads, boolean z, ExecutorService executorService, ByteLength byteLength, ByteLength byteLength2) {
            this.memoryFanIn = i;
            this.streamingFanIn = i2;
            this.makeReadersThreads = threads;
            this.readParallel = z;
            this.readParallelExec = executorService;
            this.readBufferSize = byteLength;
            this.chunkSize = byteLength2;
        }

        public int totalThreads() {
            return Math.toIntExact(this.readBufferSize.toBytes() / this.chunkSize.toBytes());
        }

        public int memoryFanIn() {
            return this.memoryFanIn;
        }

        public int streamingFanIn() {
            return this.streamingFanIn;
        }

        public Threads makeReadersThreads() {
            return this.makeReadersThreads;
        }

        public boolean readParallel() {
            return this.readParallel;
        }

        public ExecutorService readParallelExec() {
            return this.readParallelExec;
        }

        public ByteLength readBufferSize() {
            return this.readBufferSize;
        }

        public ByteLength chunkSize() {
            return this.chunkSize;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KvFileMergerByteReaderParams.class), KvFileMergerByteReaderParams.class, "memoryFanIn;streamingFanIn;makeReadersThreads;readParallel;readParallelExec;readBufferSize;chunkSize", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->memoryFanIn:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->streamingFanIn:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->makeReadersThreads:Lio/datarouter/scanner/Threads;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->readParallel:Z", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->readParallelExec:Ljava/util/concurrent/ExecutorService;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->readBufferSize:Lio/datarouter/bytes/ByteLength;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->chunkSize:Lio/datarouter/bytes/ByteLength;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KvFileMergerByteReaderParams.class), KvFileMergerByteReaderParams.class, "memoryFanIn;streamingFanIn;makeReadersThreads;readParallel;readParallelExec;readBufferSize;chunkSize", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->memoryFanIn:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->streamingFanIn:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->makeReadersThreads:Lio/datarouter/scanner/Threads;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->readParallel:Z", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->readParallelExec:Ljava/util/concurrent/ExecutorService;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->readBufferSize:Lio/datarouter/bytes/ByteLength;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->chunkSize:Lio/datarouter/bytes/ByteLength;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KvFileMergerByteReaderParams.class, Object.class), KvFileMergerByteReaderParams.class, "memoryFanIn;streamingFanIn;makeReadersThreads;readParallel;readParallelExec;readBufferSize;chunkSize", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->memoryFanIn:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->streamingFanIn:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->makeReadersThreads:Lio/datarouter/scanner/Threads;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->readParallel:Z", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->readParallelExec:Ljava/util/concurrent/ExecutorService;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->readBufferSize:Lio/datarouter/bytes/ByteLength;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;->chunkSize:Lio/datarouter/bytes/ByteLength;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams.class */
    public static final class KvFileMergerEncodeParams extends Record {
        private final ByteLength minBlockSize;
        private final int encodeBatchSize;
        private final Threads encodeThreads;

        public KvFileMergerEncodeParams(ByteLength byteLength, int i, Threads threads) {
            this.minBlockSize = byteLength;
            this.encodeBatchSize = i;
            this.encodeThreads = threads;
        }

        public ByteLength minBlockSize() {
            return this.minBlockSize;
        }

        public int encodeBatchSize() {
            return this.encodeBatchSize;
        }

        public Threads encodeThreads() {
            return this.encodeThreads;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KvFileMergerEncodeParams.class), KvFileMergerEncodeParams.class, "minBlockSize;encodeBatchSize;encodeThreads", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;->minBlockSize:Lio/datarouter/bytes/ByteLength;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;->encodeBatchSize:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;->encodeThreads:Lio/datarouter/scanner/Threads;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KvFileMergerEncodeParams.class), KvFileMergerEncodeParams.class, "minBlockSize;encodeBatchSize;encodeThreads", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;->minBlockSize:Lio/datarouter/bytes/ByteLength;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;->encodeBatchSize:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;->encodeThreads:Lio/datarouter/scanner/Threads;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KvFileMergerEncodeParams.class, Object.class), KvFileMergerEncodeParams.class, "minBlockSize;encodeBatchSize;encodeThreads", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;->minBlockSize:Lio/datarouter/bytes/ByteLength;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;->encodeBatchSize:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;->encodeThreads:Lio/datarouter/scanner/Threads;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams.class */
    public static final class KvFileMergerKvReaderParams extends Record {
        private final int parseBatchSize;
        private final Threads parseThreads;

        public KvFileMergerKvReaderParams(int i, Threads threads) {
            this.parseBatchSize = i;
            this.parseThreads = threads;
        }

        public int parseBatchSize() {
            return this.parseBatchSize;
        }

        public Threads parseThreads() {
            return this.parseThreads;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KvFileMergerKvReaderParams.class), KvFileMergerKvReaderParams.class, "parseBatchSize;parseThreads", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams;->parseBatchSize:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams;->parseThreads:Lio/datarouter/scanner/Threads;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KvFileMergerKvReaderParams.class), KvFileMergerKvReaderParams.class, "parseBatchSize;parseThreads", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams;->parseBatchSize:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams;->parseThreads:Lio/datarouter/scanner/Threads;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KvFileMergerKvReaderParams.class, Object.class), KvFileMergerKvReaderParams.class, "parseBatchSize;parseThreads", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams;->parseBatchSize:I", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams;->parseThreads:Lio/datarouter/scanner/Threads;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams.class */
    public static final class KvFileMergerParams extends Record {
        private final KvFileMergerStorageParams storageParams;
        private final KvFileMergerByteReaderParams byteReaderParams;
        private final KvFileMergerKvReaderParams kvReaderParams;
        private final KvFileMergerEncodeParams encodeParams;
        private final KvFileMergerWriteParams writeParams;
        private final Duration heartbeatPeriod;
        private final Duration logPeriod;

        public KvFileMergerParams(KvFileMergerStorageParams kvFileMergerStorageParams, KvFileMergerByteReaderParams kvFileMergerByteReaderParams, KvFileMergerKvReaderParams kvFileMergerKvReaderParams, KvFileMergerEncodeParams kvFileMergerEncodeParams, KvFileMergerWriteParams kvFileMergerWriteParams, Duration duration, Duration duration2) {
            this.storageParams = kvFileMergerStorageParams;
            this.byteReaderParams = kvFileMergerByteReaderParams;
            this.kvReaderParams = kvFileMergerKvReaderParams;
            this.encodeParams = kvFileMergerEncodeParams;
            this.writeParams = kvFileMergerWriteParams;
            this.heartbeatPeriod = duration;
            this.logPeriod = duration2;
        }

        public KvFileMergerStorageParams storageParams() {
            return this.storageParams;
        }

        public KvFileMergerByteReaderParams byteReaderParams() {
            return this.byteReaderParams;
        }

        public KvFileMergerKvReaderParams kvReaderParams() {
            return this.kvReaderParams;
        }

        public KvFileMergerEncodeParams encodeParams() {
            return this.encodeParams;
        }

        public KvFileMergerWriteParams writeParams() {
            return this.writeParams;
        }

        public Duration heartbeatPeriod() {
            return this.heartbeatPeriod;
        }

        public Duration logPeriod() {
            return this.logPeriod;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KvFileMergerParams.class), KvFileMergerParams.class, "storageParams;byteReaderParams;kvReaderParams;encodeParams;writeParams;heartbeatPeriod;logPeriod", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->storageParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->byteReaderParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->kvReaderParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->encodeParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->writeParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->heartbeatPeriod:Ljava/time/Duration;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->logPeriod:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KvFileMergerParams.class), KvFileMergerParams.class, "storageParams;byteReaderParams;kvReaderParams;encodeParams;writeParams;heartbeatPeriod;logPeriod", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->storageParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->byteReaderParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->kvReaderParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->encodeParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->writeParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->heartbeatPeriod:Ljava/time/Duration;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->logPeriod:Ljava/time/Duration;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KvFileMergerParams.class, Object.class), KvFileMergerParams.class, "storageParams;byteReaderParams;kvReaderParams;encodeParams;writeParams;heartbeatPeriod;logPeriod", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->storageParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->byteReaderParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerByteReaderParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->kvReaderParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerKvReaderParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->encodeParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerEncodeParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->writeParams:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->heartbeatPeriod:Ljava/time/Duration;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerParams;->logPeriod:Ljava/time/Duration;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams.class */
    public static final class KvFileMergerStorageParams extends Record {
        private final KvFileStorage storage;
        private final Supplier<String> filenameSupplier;

        public KvFileMergerStorageParams(KvFileStorage kvFileStorage, Supplier<String> supplier) {
            this.storage = kvFileStorage;
            this.filenameSupplier = supplier;
        }

        public KvFileStorage storage() {
            return this.storage;
        }

        public Supplier<String> filenameSupplier() {
            return this.filenameSupplier;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KvFileMergerStorageParams.class), KvFileMergerStorageParams.class, "storage;filenameSupplier", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams;->storage:Lio/datarouter/bytes/kvfile/KvFileStorage;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams;->filenameSupplier:Ljava/util/function/Supplier;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KvFileMergerStorageParams.class), KvFileMergerStorageParams.class, "storage;filenameSupplier", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams;->storage:Lio/datarouter/bytes/kvfile/KvFileStorage;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams;->filenameSupplier:Ljava/util/function/Supplier;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KvFileMergerStorageParams.class, Object.class), KvFileMergerStorageParams.class, "storage;filenameSupplier", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams;->storage:Lio/datarouter/bytes/kvfile/KvFileStorage;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerStorageParams;->filenameSupplier:Ljava/util/function/Supplier;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:io/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams.class */
    public static final class KvFileMergerWriteParams extends Record {
        private final Threads writeThreads;
        private final ByteLength partSize;

        public KvFileMergerWriteParams(Threads threads, ByteLength byteLength) {
            this.writeThreads = threads;
            this.partSize = byteLength;
        }

        public Threads writeThreads() {
            return this.writeThreads;
        }

        public ByteLength partSize() {
            return this.partSize;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KvFileMergerWriteParams.class), KvFileMergerWriteParams.class, "writeThreads;partSize", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams;->writeThreads:Lio/datarouter/scanner/Threads;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams;->partSize:Lio/datarouter/bytes/ByteLength;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KvFileMergerWriteParams.class), KvFileMergerWriteParams.class, "writeThreads;partSize", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams;->writeThreads:Lio/datarouter/scanner/Threads;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams;->partSize:Lio/datarouter/bytes/ByteLength;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KvFileMergerWriteParams.class, Object.class), KvFileMergerWriteParams.class, "writeThreads;partSize", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams;->writeThreads:Lio/datarouter/scanner/Threads;", "FIELD:Lio/datarouter/bytes/kvfile/KvFileMerger$KvFileMergerWriteParams;->partSize:Lio/datarouter/bytes/ByteLength;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    public KvFileMerger(KvFileMergerParams kvFileMergerParams, KvFileCompactorFileCache.KvFileMergePlan kvFileMergePlan, Supplier<Boolean> supplier) {
        this.params = kvFileMergerParams;
        this.plan = kvFileMergePlan;
        this.shouldStop = supplier;
        ByteLength ofBytes = ByteLength.ofBytes(10000 * kvFileMergerParams.writeParams().partSize().toBytes());
        if (kvFileMergePlan.totalInputSize().toBytes() > ofBytes.toBytes() / 2) {
            logger.warn("totalInputSize={} is greater than half of maxUploadSizeForS3={}.  Consider increasing partSize.", kvFileMergePlan.totalInputSize().toDisplay(), ofBytes.toDisplay());
        }
    }

    public KvFileNameAndSize merge() {
        Scanner batch = ((Scanner) makeReaders().listTo(this.plan.collatorStrategy().method)).batchByMinSize(this.params.encodeParams().minBlockSize().toBytes(), (v0) -> {
            return v0.length();
        }).batch(this.params.encodeParams().encodeBatchSize());
        AtomicLong atomicLong = this.waitForCollatorNs;
        atomicLong.getClass();
        Scanner periodic = batch.timeNanos((v1) -> {
            r1.addAndGet(v1);
        }).periodic(this.params.heartbeatPeriod(), list -> {
            throwIfShouldStop();
        });
        AtomicLong atomicLong2 = this.waitForShouldStopNs;
        atomicLong2.getClass();
        Scanner scanner = (Scanner) periodic.timeNanos((v1) -> {
            r1.addAndGet(v1);
        }).periodic(this.params.logPeriod(), list2 -> {
            logIntermediateProgress();
        }).apply(this::encodeBlocksToBytes);
        AtomicLong atomicLong3 = this.waitForEncoderNs;
        atomicLong3.getClass();
        KvFileNameAndSize kvFileNameAndSize = (KvFileNameAndSize) scanner.timeNanos((v1) -> {
            r1.addAndGet(v1);
        }).concat((v0) -> {
            return Scanner.of(v0);
        }).apply(this::write);
        logProgress(true, kvFileNameAndSize);
        return kvFileNameAndSize;
    }

    private Scanner<KvFileReader> makeReaders() {
        AtomicInteger atomicInteger = new AtomicInteger(this.plan.files().size());
        AtomicInteger atomicInteger2 = new AtomicInteger(this.params.byteReaderParams().totalThreads());
        return Scanner.of(this.plan.files()).sort(Comparator.comparing((v0) -> {
            return v0.size();
        })).parallelUnordered(this.params.byteReaderParams().makeReadersThreads()).map(kvFileNameAndSize -> {
            int max = atomicInteger.get() == 1 ? Math.max(1, atomicInteger2.get()) : Math.max(1, (int) ((kvFileNameAndSize.size() / this.plan.totalInputSize().toBytes()) * this.params.byteReaderParams().totalThreads()));
            atomicInteger.decrementAndGet();
            atomicInteger2.addAndGet(-max);
            logger.info("making KvReader size={}, threads={}, chunkSize={}", new Object[]{ByteLength.ofBytes(kvFileNameAndSize.size()).toDisplay(), Integer.valueOf(max), this.params.byteReaderParams().chunkSize().toDisplay()});
            return makeReader(kvFileNameAndSize, max, this.params.byteReaderParams().chunkSize());
        });
    }

    private KvFileReader makeReader(KvFileNameAndSize kvFileNameAndSize, int i, ByteLength byteLength) {
        if (kvFileNameAndSize.size() <= byteLength.toBytesInt()) {
            long nanoTime = System.nanoTime();
            byte[] read = this.params.storageParams().storage().read(kvFileNameAndSize.name());
            this.waitForChunksNs.addAndGet(System.nanoTime() - nanoTime);
            this.bytesRead.addAndGet(read.length);
            return new KvFileReader(read, kvFileNameAndSize.name(), this.params.kvReaderParams().parseBatchSize(), this.params.kvReaderParams().parseThreads());
        }
        if (this.params.byteReaderParams().readParallel()) {
            Scanner<byte[]> readParallel = this.params.storageParams().storage().readParallel(kvFileNameAndSize.name(), 0L, kvFileNameAndSize.size(), new Threads(this.params.byteReaderParams().readParallelExec(), i), byteLength);
            AtomicLong atomicLong = this.waitForChunksNs;
            atomicLong.getClass();
            return (KvFileReader) readParallel.timeNanos((v1) -> {
                r1.addAndGet(v1);
            }).each(bArr -> {
                this.bytesRead.addAndGet(bArr.length);
            }).apply(scanner -> {
                return new KvFileReader((Scanner<byte[]>) scanner, kvFileNameAndSize.name(), this.params.kvReaderParams().parseBatchSize(), this.params.kvReaderParams().parseThreads());
            });
        }
        InputStream readInputStream = this.params.storageParams().storage().readInputStream(kvFileNameAndSize.name());
        int bytesInt = ByteLength.ofMiB(1L).toBytesInt();
        AtomicLong atomicLong2 = this.bytesRead;
        atomicLong2.getClass();
        return new KvFileReader(new CountingInputStream(readInputStream, bytesInt, (v1) -> {
            r4.addAndGet(v1);
        }), kvFileNameAndSize.name(), this.params.kvReaderParams().parseBatchSize(), this.params.kvReaderParams().parseThreads());
    }

    private Scanner<List<byte[]>> encodeBlocksToBytes(Scanner<List<List<KvFileEntry>>> scanner) {
        return scanner.parallelOrdered(this.params.encodeParams().encodeThreads()).map(list -> {
            return Scanner.of(list).each(list -> {
                this.recordsWritten.addAndGet(list.size());
                this.recordsWrittenSinceLastLog.addAndGet(list.size());
                this.blocksWritten.incrementAndGet();
            }).map(KvFileBlock::new).map((v0) -> {
                return v0.toBytes();
            }).each(bArr -> {
                this.bytesWritten.addAndGet(bArr.length);
            }).list();
        });
    }

    private KvFileNameAndSize write(Scanner<byte[]> scanner) {
        String str = this.params.storageParams().filenameSupplier().get();
        if (this.plan.totalInputSize().toBytes() > this.params.writeParams().partSize().toBytes()) {
            this.params.storageParams().storage().writeParallel(str, groupBlocksIntoUploadParts(scanner), this.params.writeParams().writeThreads());
        } else {
            this.params.storageParams().storage().write(str, (byte[]) scanner.listTo(ByteTool::concat));
        }
        return new KvFileNameAndSize(str, this.bytesWritten.get());
    }

    private Scanner<List<byte[]>> groupBlocksIntoUploadParts(Scanner<byte[]> scanner) {
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        return scanner.each(bArr -> {
            if (atomicLong2.addAndGet(bArr.length) >= this.params.writeParams().partSize().toBytes()) {
                atomicLong.incrementAndGet();
                atomicLong2.set(0L);
            }
        }).splitBy(bArr2 -> {
            return Long.valueOf(atomicLong.get());
        }).map((v0) -> {
            return v0.list();
        });
    }

    private void logIntermediateProgress() {
        logProgress(false, null);
        this.lastLogTimeNs.set(System.nanoTime());
        this.recordsWrittenSinceLastLog.set(0L);
    }

    private void logProgress(boolean z, KvFileNameAndSize kvFileNameAndSize) {
        Duration between = Duration.between(this.startTime, Instant.now());
        String str = z ? "merged" : "merging";
        Function function = number -> {
            return new DecimalFormat("###,###,###,###,###,###,###").format(number);
        };
        Function function2 = l -> {
            return String.valueOf((String) function.apply(Long.valueOf(l.longValue() / 1000000))) + "ms";
        };
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("rwpsLatest", (String) function.apply(Long.valueOf((this.recordsWrittenSinceLastLog.get() * 1000000000) / (System.nanoTime() - this.lastLogTimeNs.get()))));
        linkedHashMap.put("rwpsCumulative", (String) function.apply(Long.valueOf((this.recordsWritten.get() * 1000000000) / between.toNanos())));
        linkedHashMap.put("recordsRead", (String) function.apply(Long.valueOf(this.recordsRead.get())));
        linkedHashMap.put("recordsWritten", (String) function.apply(Long.valueOf(this.recordsWritten.get())));
        linkedHashMap.put("blocksRead", (String) function.apply(Long.valueOf(this.blocksRead.get())));
        linkedHashMap.put("blocksWritten", (String) function.apply(Long.valueOf(this.blocksWritten.get())));
        linkedHashMap.put("bytesRead", ByteLength.ofBytes(this.bytesRead.get()).toDisplay());
        linkedHashMap.put("bytesWritten", ByteLength.ofBytes(this.bytesWritten.get()).toDisplay());
        linkedHashMap.put("duration", ((String) function2.apply(Long.valueOf(between.toNanos()))).toString());
        linkedHashMap.put("waitForChunks", (String) function2.apply(Long.valueOf(this.waitForChunksNs.get())));
        linkedHashMap.put("waitForCollator", (String) function2.apply(Long.valueOf(this.waitForCollatorNs.get())));
        linkedHashMap.put("waitForShouldStop", (String) function2.apply(Long.valueOf(this.waitForShouldStopNs.get())));
        linkedHashMap.put("waitForEncoder", (String) function2.apply(Long.valueOf(this.waitForEncoderNs.get())));
        linkedHashMap.put("collator", this.plan.collatorStrategy().name());
        if (kvFileNameAndSize != null) {
            linkedHashMap.put("newFile", kvFileNameAndSize.name());
        }
        logger.warn((String) Scanner.of(linkedHashMap.keySet()).map(str2 -> {
            return String.valueOf(str2) + "=" + ((String) linkedHashMap.get(str2));
        }).collect(Collectors.joining(", ", String.valueOf(str) + "[", "]")));
    }

    private void throwIfShouldStop() {
        if (this.shouldStop.get().booleanValue()) {
            throw new RuntimeException("stop requested");
        }
    }
}
