package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.log.LogStreamBatchReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.LogStreamReaderRule;
import io.camunda.zeebe.logstreams.util.LogStreamRule;
import io.camunda.zeebe.logstreams.util.LogStreamWriterRule;
import io.camunda.zeebe.util.ByteValue;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.NoSuchElementException;
import java.util.Objects;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStreamBatchReaderTest.class */
public class LogStreamBatchReaderTest {
    private static final DirectBuffer EVENT_VALUE = BufferUtil.wrapString("test");
    private static final int LOG_SEGMENT_SIZE = (int) ByteValue.ofMegabytes(4);
    private final LogStreamRule logStreamRule = LogStreamRule.startByDefault(logStreamBuilder -> {
        logStreamBuilder.withMaxFragmentSize(LOG_SEGMENT_SIZE);
    });
    private final LogStreamWriterRule writerRule = new LogStreamWriterRule(this.logStreamRule);
    private final LogStreamReaderRule readerRule = new LogStreamReaderRule(this.logStreamRule);

    @Rule
    public final RuleChain ruleChain = RuleChain.outerRule(this.logStreamRule).around(this.readerRule).around(this.writerRule);
    private LogStreamBatchReader batchReader;

    @Before
    public void setUp() {
        this.batchReader = new LogStreamBatchReaderImpl(this.readerRule.getLogStreamReader());
    }

    @Test
    public void shouldNotHaveNextIfEmpty() {
        Assertions.assertThat(this.batchReader.hasNext()).isFalse();
    }

    @Test
    public void shouldReadEventsInBatch() {
        long writeEvent = this.writerRule.sourceEventPosition(1L).writeEvent(logEntryBuilder -> {
            logEntryBuilder.key(1L).value(EVENT_VALUE);
        });
        long writeEvent2 = this.writerRule.sourceEventPosition(1L).writeEvent(logEntryBuilder2 -> {
            logEntryBuilder2.key(2L).value(EVENT_VALUE);
        });
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch).isNotNull();
        Assertions.assertThat(batch.hasNext()).isTrue();
        LoggedEvent loggedEvent = (LoggedEvent) batch.next();
        Assertions.assertThat(loggedEvent.getKey()).isEqualTo(1L);
        Assertions.assertThat(loggedEvent.getPosition()).isEqualTo(writeEvent);
        Assertions.assertThat(batch.hasNext()).isTrue();
        LoggedEvent loggedEvent2 = (LoggedEvent) batch.next();
        Assertions.assertThat(loggedEvent2.getKey()).isEqualTo(2L);
        Assertions.assertThat(loggedEvent2.getPosition()).isEqualTo(writeEvent2);
        Assertions.assertThat(batch.hasNext()).isFalse();
        Assertions.assertThat(this.batchReader.hasNext()).isFalse();
    }

    @Test
    public void shouldReadNextBatch() {
        long writeEvent = this.writerRule.sourceEventPosition(1L).writeEvent(logEntryBuilder -> {
            logEntryBuilder.key(1L).value(EVENT_VALUE);
        });
        long writeEvent2 = this.writerRule.sourceEventPosition(2L).writeEvent(logEntryBuilder2 -> {
            logEntryBuilder2.key(2L).value(EVENT_VALUE);
        });
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch.hasNext()).isTrue();
        LoggedEvent loggedEvent = (LoggedEvent) batch.next();
        Assertions.assertThat(loggedEvent.getKey()).isEqualTo(1L);
        Assertions.assertThat(loggedEvent.getPosition()).isEqualTo(writeEvent);
        Assertions.assertThat(batch.hasNext()).isFalse();
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch2 = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch2.hasNext()).isTrue();
        LoggedEvent loggedEvent2 = (LoggedEvent) batch2.next();
        Assertions.assertThat(loggedEvent2.getKey()).isEqualTo(2L);
        Assertions.assertThat(loggedEvent2.getPosition()).isEqualTo(writeEvent2);
        Assertions.assertThat(batch2.hasNext()).isFalse();
        Assertions.assertThat(this.batchReader.hasNext()).isFalse();
    }

    @Test
    public void shouldReadEventsWithoutSourceEventPosition() {
        long writeEvent = this.writerRule.writeEvent(EVENT_VALUE);
        long writeEvent2 = this.writerRule.writeEvent(EVENT_VALUE);
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch.next()).getPosition()).isEqualTo(writeEvent);
        Assertions.assertThat(batch.hasNext()).isFalse();
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch2 = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch2.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch2.next()).getPosition()).isEqualTo(writeEvent2);
    }

    @Test
    public void shouldNotIncludeEventsWithoutSourceEventPosition() {
        long writeEvent = this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        long writeEvent2 = this.writerRule.writeEvent(EVENT_VALUE);
        long writeEvent3 = this.writerRule.sourceEventPosition(2L).writeEvent(EVENT_VALUE);
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch.next()).getPosition()).isEqualTo(writeEvent);
        Assertions.assertThat(batch.hasNext()).isFalse();
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch2 = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch2.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch2.next()).getPosition()).isEqualTo(writeEvent2);
        Assertions.assertThat(batch2.hasNext()).isFalse();
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch3 = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch3.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch3.next()).getPosition()).isEqualTo(writeEvent3);
    }

    @Test
    public void shouldMoveBatchToHead() {
        long writeEvent = this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        long writeEvent2 = this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        long writeEvent3 = this.writerRule.sourceEventPosition(2L).writeEvent(EVENT_VALUE);
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch.next()).getPosition()).isEqualTo(writeEvent);
        batch.head();
        Assertions.assertThat(batch.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch.next()).getPosition()).isEqualTo(writeEvent);
        Assertions.assertThat(batch.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch.next()).getPosition()).isEqualTo(writeEvent2);
        Assertions.assertThat(batch.hasNext()).isFalse();
        LogStreamBatchReader.Batch batch2 = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch2.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch2.next()).getPosition()).isEqualTo(writeEvent3);
    }

    @Test
    public void shouldSkipEventsInBatch() {
        long writeEvent = this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        long writeEvent2 = this.writerRule.sourceEventPosition(2L).writeEvent(EVENT_VALUE);
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch.next()).getPosition()).isEqualTo(writeEvent);
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch2 = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch2.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch2.next()).getPosition()).isEqualTo(writeEvent2);
    }

    @Test
    public void shouldSeekToHeadIfNegative() {
        long writeEvent = this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        Assertions.assertThat(this.batchReader.seekToNextBatch(-1L)).isTrue();
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch.next()).getPosition()).isEqualTo(writeEvent);
    }

    @Test
    public void shouldSeekToNextBatch() {
        this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        long writeEvent = this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        long writeEvent2 = this.writerRule.sourceEventPosition(2L).writeEvent(EVENT_VALUE);
        Assertions.assertThat(this.batchReader.seekToNextBatch(writeEvent)).isTrue();
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch.next()).getPosition()).isEqualTo(writeEvent2);
    }

    @Test
    public void shouldSeekToNextEventWithinBatch() {
        long writeEvent = this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        long writeEvent2 = this.writerRule.sourceEventPosition(2L).writeEvent(EVENT_VALUE);
        Assertions.assertThat(this.batchReader.seekToNextBatch(writeEvent)).isTrue();
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        Assertions.assertThat(batch.hasNext()).isTrue();
        Assertions.assertThat(((LoggedEvent) batch.next()).getPosition()).isEqualTo(writeEvent2);
    }

    @Test
    public void shouldSeekToTailIfLastEvent() {
        this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE);
        Assertions.assertThat(this.batchReader.seekToNextBatch(this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE))).isTrue();
        Assertions.assertThat(this.batchReader.hasNext()).isFalse();
    }

    @Test
    public void shouldSeekToNotExistingPosition() {
        Assertions.assertThat(this.batchReader.seekToNextBatch(this.writerRule.sourceEventPosition(1L).writeEvent(EVENT_VALUE) + 1)).isFalse();
        Assertions.assertThat(this.batchReader.hasNext()).isFalse();
    }

    @Test
    public void shouldNotHaveNextIfClosed() {
        this.batchReader.close();
        Assertions.assertThat(this.batchReader.hasNext()).isFalse();
    }

    @Test
    public void shouldThrowNoSuchElementExceptionOnNextBatch() {
        Assertions.assertThat(this.batchReader.hasNext()).isFalse();
        LogStreamBatchReader logStreamBatchReader = this.batchReader;
        Objects.requireNonNull(logStreamBatchReader);
        Assertions.assertThatThrownBy(logStreamBatchReader::next).isInstanceOf(NoSuchElementException.class);
    }

    @Test
    public void shouldThrowNoSuchElementExceptionOnNextEvent() {
        this.writerRule.writeEvent(logEntryBuilder -> {
            logEntryBuilder.key(1L).value(EVENT_VALUE);
        });
        Assertions.assertThat(this.batchReader.hasNext()).isTrue();
        LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.batchReader.next();
        batch.next();
        Assertions.assertThat(batch.hasNext()).isFalse();
        Objects.requireNonNull(batch);
        Assertions.assertThatThrownBy(batch::next).isInstanceOf(NoSuchElementException.class);
    }
}
