package io.aeron.samples.archive;

import io.aeron.Aeron;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.CommonContext;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchiveThreadingMode;
import io.aeron.archive.ArchivingMediaDriver;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import java.io.File;
import java.nio.ByteOrder;
import java.util.Random;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.SystemUtil;
import org.agrona.collections.LongArrayList;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.YieldingIdleStrategy;
import org.agrona.concurrent.status.CountersReader;

/* loaded from: input_file:io/aeron/samples/archive/IndexedReplicatedRecording.class */
public class IndexedReplicatedRecording implements AutoCloseable {
    static final int MESSAGE_INDEX_OFFSET = 0;
    static final int TIMESTAMP_OFFSET = 8;
    static final int HEADER_LENGTH = 16;
    static final int MESSAGE_BURST_COUNT = 10000;
    private static final long CATALOG_CAPACITY = 65536;
    private static final int SRC_CONTROL_STREAM_ID = 10;
    private static final String SRC_CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:8090";
    private static final String SRC_CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:0";
    private static final String DST_CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:8095";
    private static final String DST_CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:0";
    private static final String SRC_REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0";
    private static final String DST_REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0";
    private static final int LIVE_STREAM_ID = 1033;
    private static final int INDEX_STREAM_ID = 1097;
    private final ArchivingMediaDriver srcArchivingMediaDriver;
    private final ArchivingMediaDriver dstArchivingMediaDriver;
    private final Aeron srcAeron;
    private final Aeron dstAeron;
    private final AeronArchive srcAeronArchive;
    private final AeronArchive dstAeronArchive;
    private static final int TERM_LENGTH = 65536;
    private static final String LIVE_CHANNEL = new ChannelUriStringBuilder().media("udp").controlEndpoint("localhost:8100").termLength(Integer.valueOf(TERM_LENGTH)).build();
    private static final String INDEX_CHANNEL = new ChannelUriStringBuilder().media("ipc").termLength(Integer.valueOf(TERM_LENGTH)).build();

    /* loaded from: input_file:io/aeron/samples/archive/IndexedReplicatedRecording$Indexer.class */
    static class Indexer implements AutoCloseable, Runnable, ControlledFragmentHandler {
        private static final int FRAGMENT_LIMIT = 10;
        private static final int INDEX_BUFFER_CAPACITY = 1024;
        private static final int BATCH_SIZE = 126;
        private final int sessionId;
        private final Subscription subscription;
        private final Publication publication;
        private Image image;
        private int nextMessageIndex = 0;
        private int batchIndex = 0;
        private long lastMessagePosition = -1;
        private final LongArrayList messagePositions = new LongArrayList();
        private final LongArrayList timestamps = new LongArrayList();
        private final LongArrayList timestampPositions = new LongArrayList();
        private final UnsafeBuffer indexBuffer = new UnsafeBuffer(new byte[INDEX_BUFFER_CAPACITY]);

        static Thread start(Indexer indexer) {
            Thread thread = new Thread(indexer);
            thread.setName("indexer");
            thread.setDaemon(true);
            thread.start();
            return thread;
        }

        Indexer(Subscription subscription, Publication publication, int i) {
            this.subscription = subscription;
            this.publication = publication;
            this.sessionId = i;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            CloseHelper.close(this.subscription);
        }

        long position() {
            if (null == this.image) {
                return -1L;
            }
            return this.image.position();
        }

        void awaitPosition(long j) {
            while (position() < j) {
                Thread.yield();
            }
        }

        long nextMessageIndex() {
            return this.nextMessageIndex;
        }

        LongArrayList messagePositions() {
            return this.messagePositions;
        }

        LongArrayList timestamps() {
            return this.timestamps;
        }

        LongArrayList timestampPositions() {
            return this.timestampPositions;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                if (this.subscription.isConnected() && this.publication.isConnected()) {
                    break;
                }
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            Image imageBySessionId = this.subscription.imageBySessionId(this.sessionId);
            this.image = imageBySessionId;
            if (null == imageBySessionId) {
                throw new IllegalStateException("session not found");
            }
            this.lastMessagePosition = imageBySessionId.joinPosition();
            YieldingIdleStrategy yieldingIdleStrategy = YieldingIdleStrategy.INSTANCE;
            while (true) {
                int controlledPoll = imageBySessionId.controlledPoll(this, 10);
                if (0 == controlledPoll && (Thread.interrupted() || imageBySessionId.isClosed())) {
                    return;
                } else {
                    yieldingIdleStrategy.idle(controlledPoll);
                }
            }
        }

        public ControlledFragmentHandler.Action onFragment(DirectBuffer directBuffer, int i, int i2, Header header) {
            long j = this.lastMessagePosition;
            long j2 = directBuffer.getLong(i + 0, ByteOrder.LITTLE_ENDIAN);
            if (j2 != this.nextMessageIndex) {
                throw new IllegalStateException("invalid index: expected=" + this.nextMessageIndex + " actual=" + j2);
            }
            if (0 == this.batchIndex) {
                long j3 = directBuffer.getLong(i + 8, ByteOrder.LITTLE_ENDIAN);
                this.timestamps.addLong(j3);
                this.timestampPositions.addLong(j);
                this.indexBuffer.putLong(0, this.nextMessageIndex, ByteOrder.LITTLE_ENDIAN);
                this.indexBuffer.putLong(8, j3, ByteOrder.LITTLE_ENDIAN);
            }
            this.indexBuffer.putLong(16 + (this.batchIndex * 8), j, ByteOrder.LITTLE_ENDIAN);
            int i3 = this.batchIndex + 1;
            this.batchIndex = i3;
            if (i3 >= BATCH_SIZE) {
                if (this.publication.offer(this.indexBuffer, 0, INDEX_BUFFER_CAPACITY) <= 0) {
                    this.batchIndex--;
                    return ControlledFragmentHandler.Action.ABORT;
                }
                this.batchIndex = 0;
            }
            this.messagePositions.addLong(j);
            this.lastMessagePosition = header.position();
            this.nextMessageIndex++;
            return ControlledFragmentHandler.Action.CONTINUE;
        }
    }

    /* loaded from: input_file:io/aeron/samples/archive/IndexedReplicatedRecording$Sequencer.class */
    static class Sequencer implements AutoCloseable {
        private static final int MAX_MESSAGE_LENGTH = 1000;
        private long nextMessageIndex;
        private final int burstLength;
        private final Publication publication;
        private final Random random = new Random();
        private final UnsafeBuffer buffer = new UnsafeBuffer(new byte[1016]);

        Sequencer(int i, Publication publication) {
            this.burstLength = i;
            this.publication = publication;
            this.buffer.setMemory(16, MAX_MESSAGE_LENGTH, (byte) 88);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            CloseHelper.close(this.publication);
        }

        long nextMessageIndex() {
            return this.nextMessageIndex;
        }

        void sendBurst() throws InterruptedException {
            for (int i = 0; i < this.burstLength; i++) {
                appendMessage();
            }
        }

        private void appendMessage() throws InterruptedException {
            int nextInt = this.random.nextInt(MAX_MESSAGE_LENGTH);
            this.buffer.putLong(0, this.nextMessageIndex, ByteOrder.LITTLE_ENDIAN);
            this.buffer.putLong(8, System.currentTimeMillis(), ByteOrder.LITTLE_ENDIAN);
            while (this.publication.offer(this.buffer, 0, 16 + nextInt) < 0) {
                Thread.yield();
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
            }
            this.nextMessageIndex++;
        }
    }

    IndexedReplicatedRecording() {
        String str = CommonContext.getAeronDirectoryName() + "-src";
        System.out.println("srcAeronDirectoryName=" + str);
        String str2 = CommonContext.getAeronDirectoryName() + "-dst";
        System.out.println("dstAeronDirectoryName=" + str2);
        File file = new File(SystemUtil.tmpDirName(), "src-archive");
        System.out.println("srcArchiveDir=" + file);
        this.srcArchivingMediaDriver = ArchivingMediaDriver.launch(new MediaDriver.Context().aeronDirectoryName(str).termBufferSparseFile(true).threadingMode(ThreadingMode.SHARED).errorHandler((v0) -> {
            v0.printStackTrace();
        }).spiesSimulateConnection(true).dirDeleteOnShutdown(true).dirDeleteOnStart(true), new Archive.Context().catalogCapacity(CATALOG_CAPACITY).controlChannel(SRC_CONTROL_REQUEST_CHANNEL).archiveClientContext(new AeronArchive.Context().controlResponseChannel("aeron:udp?endpoint=localhost:0")).replicationChannel("aeron:udp?endpoint=localhost:0").deleteArchiveOnStart(true).archiveDir(file).fileSyncLevel(0).threadingMode(ArchiveThreadingMode.SHARED));
        File file2 = new File(SystemUtil.tmpDirName(), "dst-archive");
        System.out.println("dstArchiveDir=" + file2);
        this.dstArchivingMediaDriver = ArchivingMediaDriver.launch(new MediaDriver.Context().aeronDirectoryName(str2).termBufferSparseFile(true).threadingMode(ThreadingMode.SHARED).errorHandler((v0) -> {
            v0.printStackTrace();
        }).spiesSimulateConnection(true).dirDeleteOnShutdown(true).dirDeleteOnStart(true), new Archive.Context().catalogCapacity(CATALOG_CAPACITY).controlChannel(DST_CONTROL_REQUEST_CHANNEL).archiveClientContext(new AeronArchive.Context().controlResponseChannel("aeron:udp?endpoint=localhost:0")).replicationChannel("aeron:udp?endpoint=localhost:0").deleteArchiveOnStart(true).archiveDir(file2).fileSyncLevel(0).threadingMode(ArchiveThreadingMode.SHARED));
        this.srcAeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(str));
        this.dstAeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(str2));
        this.srcAeronArchive = AeronArchive.connect(new AeronArchive.Context().idleStrategy(YieldingIdleStrategy.INSTANCE).controlRequestChannel(SRC_CONTROL_REQUEST_CHANNEL).controlResponseChannel("aeron:udp?endpoint=localhost:0").aeron(this.srcAeron));
        this.dstAeronArchive = AeronArchive.connect(new AeronArchive.Context().idleStrategy(YieldingIdleStrategy.INSTANCE).controlRequestChannel(DST_CONTROL_REQUEST_CHANNEL).controlResponseChannel("aeron:udp?endpoint=localhost:0").aeron(this.dstAeron));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        CloseHelper.closeAll(new AutoCloseable[]{this.srcAeronArchive, this.dstAeronArchive, this.srcAeron, this.dstAeron, this.srcArchivingMediaDriver, this.dstArchivingMediaDriver});
        this.srcArchivingMediaDriver.archive().context().deleteDirectory();
        this.dstArchivingMediaDriver.archive().context().deleteDirectory();
    }

    public static void main(String[] strArr) throws InterruptedException {
        IndexedReplicatedRecording indexedReplicatedRecording = new IndexedReplicatedRecording();
        Throwable th = null;
        try {
            ExclusivePublication addExclusivePublication = indexedReplicatedRecording.srcAeron.addExclusivePublication(LIVE_CHANNEL, LIVE_STREAM_ID);
            String str = LIVE_CHANNEL + "|session-id=" + addExclusivePublication.sessionId();
            Sequencer sequencer = new Sequencer(10000, addExclusivePublication);
            Indexer indexer = new Indexer(indexedReplicatedRecording.srcAeron.addSubscription("aeron-spy:" + str, LIVE_STREAM_ID), indexedReplicatedRecording.srcAeron.addExclusivePublication(INDEX_CHANNEL, INDEX_STREAM_ID), addExclusivePublication.sessionId());
            indexedReplicatedRecording.srcAeronArchive.startRecording(INDEX_CHANNEL, INDEX_STREAM_ID, SourceLocation.LOCAL, true);
            Thread start = Indexer.start(indexer);
            long startRecording = indexedReplicatedRecording.srcAeronArchive.startRecording(str, LIVE_STREAM_ID, SourceLocation.LOCAL, true);
            CountersReader countersReader = indexedReplicatedRecording.srcAeron.countersReader();
            int awaitRecordingCounterId = awaitRecordingCounterId(countersReader, addExclusivePublication.sessionId());
            long recordingId = RecordingPos.getRecordingId(countersReader, awaitRecordingCounterId);
            sequencer.sendBurst();
            long nextCorrelationId = indexedReplicatedRecording.dstAeron.nextCorrelationId();
            long nextCorrelationId2 = indexedReplicatedRecording.dstAeron.nextCorrelationId();
            Indexer indexer2 = new Indexer(indexedReplicatedRecording.dstAeron.addSubscription("aeron:udp?control-mode=manual|rejoin=false|tags=" + nextCorrelationId + "," + nextCorrelationId2, LIVE_STREAM_ID), indexedReplicatedRecording.dstAeron.addExclusivePublication(INDEX_CHANNEL, INDEX_STREAM_ID), addExclusivePublication.sessionId());
            indexedReplicatedRecording.dstAeronArchive.startRecording(INDEX_CHANNEL, INDEX_STREAM_ID, SourceLocation.LOCAL, true);
            Thread start2 = Indexer.start(indexer2);
            long taggedReplicate = indexedReplicatedRecording.dstAeronArchive.taggedReplicate(recordingId, -1L, nextCorrelationId, nextCorrelationId2, 10, SRC_CONTROL_REQUEST_CHANNEL, LIVE_CHANNEL);
            sequencer.sendBurst();
            sequencer.sendBurst();
            long position = addExclusivePublication.position();
            awaitPosition(countersReader, awaitRecordingCounterId, position);
            indexer.awaitPosition(position);
            CountersReader countersReader2 = indexedReplicatedRecording.dstAeron.countersReader();
            awaitPosition(countersReader2, awaitRecordingCounterId(countersReader2, addExclusivePublication.sessionId()), position);
            indexer2.awaitPosition(position);
            start.interrupt();
            start.join();
            indexer.close();
            start2.interrupt();
            start2.join();
            indexer2.close();
            indexedReplicatedRecording.dstAeronArchive.stopReplication(taggedReplicate);
            indexedReplicatedRecording.srcAeronArchive.stopRecording(startRecording);
            assertEquals("index", sequencer.nextMessageIndex(), indexer.nextMessageIndex());
            assertEquals("index", sequencer.nextMessageIndex(), indexer2.nextMessageIndex());
            assertEquals("positions", indexer.messagePositions(), indexer2.messagePositions());
            assertEquals("timestamps", indexer.timestamps(), indexer2.timestamps());
            assertEquals("timestamp positions", indexer.timestampPositions(), indexer2.timestampPositions());
            if (indexedReplicatedRecording != null) {
                if (0 == 0) {
                    indexedReplicatedRecording.close();
                    return;
                }
                try {
                    indexedReplicatedRecording.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (indexedReplicatedRecording != null) {
                if (0 != 0) {
                    try {
                        indexedReplicatedRecording.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    indexedReplicatedRecording.close();
                }
            }
            throw th3;
        }
    }

    static int awaitRecordingCounterId(CountersReader countersReader, int i) throws InterruptedException {
        do {
            int findCounterIdBySession = RecordingPos.findCounterIdBySession(countersReader, i);
            if (-1 != findCounterIdBySession) {
                return findCounterIdBySession;
            }
            Thread.yield();
        } while (!Thread.interrupted());
        throw new InterruptedException();
    }

    static void awaitPosition(CountersReader countersReader, int i, long j) throws InterruptedException {
        while (countersReader.getCounterValue(i) < j) {
            if (countersReader.getCounterState(i) != 1) {
                throw new IllegalStateException("count not active: " + i);
            }
            Thread.yield();
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
        }
    }

    static void assertEquals(String str, long j, long j2) {
        if (j != j2) {
            throw new IllegalStateException(str + " not equal: srcValue=" + j + " dstValue=" + j2);
        }
    }

    static void assertEquals(String str, LongArrayList longArrayList, LongArrayList longArrayList2) {
        int size = longArrayList.size();
        int size2 = longArrayList2.size();
        if (size != size2) {
            throw new IllegalStateException(str + " not equal: srcList.size=" + size + " dstList.size=" + size2);
        }
        for (int i = 0; i < size; i++) {
            long j = longArrayList.getLong(i);
            long j2 = longArrayList.getLong(i);
            if (j != j2) {
                throw new IllegalStateException(str + " [" + i + "] not equal: srcVal=" + j + " dstVal=" + j2);
            }
        }
    }
}
