package io.camunda.zeebe.logstreams.log;

import io.camunda.zeebe.logstreams.util.LogStreamRule;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/camunda/zeebe/logstreams/log/LogStreamTest.class */
public final class LogStreamTest {
    public static final int PARTITION_ID = 0;
    private final LogStreamRule logStreamRule = LogStreamRule.startByDefault();

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.logStreamRule);
    private SynchronousLogStream logStream;

    @Before
    public void setup() {
        this.logStream = this.logStreamRule.getLogStream();
    }

    @Test
    public void shouldBuildLogStream() {
        Assertions.assertThat(this.logStream.getPartitionId()).isEqualTo(0);
        Assertions.assertThat(this.logStream.getLogName()).isEqualTo("0");
        Assertions.assertThat(this.logStream.newLogStreamReader()).isNotNull();
        Assertions.assertThat(this.logStream.newLogStreamBatchWriter()).isNotNull();
        Assertions.assertThat(this.logStream.newLogStreamRecordWriter()).isNotNull();
    }

    @Test
    public void shouldCreateNewLogStreamRecordWriter() {
        LogStreamRecordWriter newLogStreamRecordWriter = this.logStream.newLogStreamRecordWriter();
        LogStreamRecordWriter newLogStreamRecordWriter2 = this.logStream.newLogStreamRecordWriter();
        Assert.assertNotNull(newLogStreamRecordWriter);
        Assertions.assertThat(newLogStreamRecordWriter2).isNotNull().isNotEqualTo(newLogStreamRecordWriter);
    }

    @Test
    public void shouldCreateNewLogStreamBatchWriter() {
        LogStreamBatchWriter newLogStreamBatchWriter = this.logStream.newLogStreamBatchWriter();
        LogStreamBatchWriter newLogStreamBatchWriter2 = this.logStream.newLogStreamBatchWriter();
        Assert.assertNotNull(newLogStreamBatchWriter);
        Assertions.assertThat(newLogStreamBatchWriter2).isNotNull().isNotEqualTo(newLogStreamBatchWriter);
    }

    @Test
    public void shouldCloseLogStream() {
        this.logStream.close();
        Assertions.assertThatThrownBy(() -> {
            this.logStream.newLogStreamRecordWriter();
        }).hasMessage("Actor is closed");
        Assertions.assertThatThrownBy(() -> {
            this.logStream.newLogStreamBatchWriter();
        }).hasMessage("Actor is closed");
    }

    @Test
    public void shouldIncreasePositionOnRestart() {
        LogStreamRecordWriter newLogStreamRecordWriter = this.logStream.newLogStreamRecordWriter();
        newLogStreamRecordWriter.value(BufferUtil.wrapString("value")).tryWrite();
        newLogStreamRecordWriter.value(BufferUtil.wrapString("value")).tryWrite();
        newLogStreamRecordWriter.value(BufferUtil.wrapString("value")).tryWrite();
        long tryWrite = newLogStreamRecordWriter.value(BufferUtil.wrapString("value")).tryWrite();
        TestUtil.waitUntil(() -> {
            return this.logStream.getLastWrittenPosition() >= tryWrite;
        });
        this.logStream.close();
        this.logStreamRule.createLogStream();
        Assertions.assertThat(this.logStreamRule.getLogStream().newLogStreamRecordWriter().value(BufferUtil.wrapString("value")).tryWrite()).isGreaterThan(tryWrite);
    }

    @Test
    public void shouldNotifyWhenNewRecordsAreAvailable() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.logStream.getAsyncLogStream().registerRecordAvailableListener(() -> {
            countDownLatch.countDown();
        });
        writeEvent(this.logStream);
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void shouldNotifyMultipleListenersWhenNewRecordsAreAvailable() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.logStream.getAsyncLogStream().registerRecordAvailableListener(() -> {
            countDownLatch.countDown();
        });
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.logStream.getAsyncLogStream().registerRecordAvailableListener(() -> {
            countDownLatch2.countDown();
        });
        writeEvent(this.logStream);
        Assertions.assertThat(countDownLatch.await(2L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(countDownLatch2.await(2L, TimeUnit.SECONDS)).isTrue();
    }

    static long writeEvent(SynchronousLogStream synchronousLogStream) {
        return writeEvent(synchronousLogStream, BufferUtil.wrapString("event"));
    }

    static long writeEvent(SynchronousLogStream synchronousLogStream, DirectBuffer directBuffer) {
        LogStreamRecordWriter newLogStreamRecordWriter = synchronousLogStream.newLogStreamRecordWriter();
        long j = -1;
        while (true) {
            long j2 = j;
            if (j2 >= 0) {
                TestUtil.waitUntil(() -> {
                    return synchronousLogStream.getLastWrittenPosition() >= j2;
                });
                return j2;
            }
            j = newLogStreamRecordWriter.value(directBuffer).tryWrite();
        }
    }
}
