package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.streamprocessor.state.MutableLastProcessedPositionState;
import java.util.Objects;
import java.util.function.Predicate;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.verification.VerificationWithTimeout;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReplayModeTest.class */
public final class StreamProcessorReplayModeTest {
    private static final int PARTITION_ID = 1;

    @Rule
    public final StreamProcessorRule replayUntilEnd = new StreamProcessorRule(1).withStreamProcessorMode(StreamProcessorMode.PROCESSING);

    @Rule
    public final StreamProcessorRule replayContinuously = new StreamProcessorRule(1).withStreamProcessorMode(StreamProcessorMode.REPLAY);

    @Rule
    public MockitoRule mockitoRule = MockitoJUnit.rule();

    @Mock
    private TypedRecordProcessor<?> typedRecordProcessor;

    @Mock
    private EventApplier eventApplier;
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);
    private static final ProcessInstanceRecord RECORD = Records.processInstance(1);

    @Test
    public void shouldReplayUntilEnd() {
        this.replayUntilEnd.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        startStreamProcessor(this.replayUntilEnd);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(getCurrentPhase(this.replayUntilEnd)).isEqualTo(StreamProcessor.Phase.PROCESSING);
        });
        this.replayUntilEnd.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier) inOrder.verify(this.eventApplier, TIMEOUT)).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(this.typedRecordProcessor, TIMEOUT)).processRecord((TypedRecord) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldReplayContinuously() {
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        startStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        ((EventApplier) Mockito.verify(this.eventApplier, TIMEOUT.times(2))).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.eventApplier});
        Assertions.assertThat(getCurrentPhase(this.replayContinuously)).isEqualTo(StreamProcessor.Phase.REPLAY);
    }

    @Test
    public void shouldReplayIfNoEventsAfterSnapshot() {
        startStreamProcessor(this.replayContinuously);
        this.replayContinuously.getLastProcessedPositionState().markAsProcessed(1L);
        this.replayContinuously.snapshot();
        this.replayContinuously.closeStreamProcessor();
        startStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.key(1L).sourceRecordPosition(1L);
        });
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier) inOrder.verify(this.eventApplier, TIMEOUT.times(1))).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotReplayWhenPaused() {
        startWithPausedStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        Mockito.inOrder(new Object[]{this.typedRecordProcessor, this.eventApplier}).verifyNoMoreInteractions();
        Assertions.assertThat(getCurrentPhase(this.replayContinuously)).isEqualTo(StreamProcessor.Phase.PAUSED);
    }

    @Test
    public void shouldPauseReplay() {
        StreamProcessor startStreamProcessor = startStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        ConditionFactory await = Awaitility.await("should have replayed first events");
        StreamProcessorRule streamProcessorRule = this.replayContinuously;
        Objects.requireNonNull(streamProcessorRule);
        await.until(streamProcessorRule::getLastSuccessfulProcessedRecordPosition, l -> {
            return l.longValue() > 0;
        });
        startStreamProcessor.pauseProcessing().join();
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier) inOrder.verify(this.eventApplier, TIMEOUT.times(1))).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
        Assertions.assertThat(getCurrentPhase(this.replayContinuously)).isEqualTo(StreamProcessor.Phase.PAUSED);
    }

    @Test
    public void shouldReplayAfterResumed() {
        startWithPausedStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        this.replayContinuously.resumeProcessing(1);
        ((EventApplier) Mockito.verify(this.eventApplier, TIMEOUT.times(1))).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.eventApplier});
        Assertions.assertThat(getCurrentPhase(this.replayContinuously)).isEqualTo(StreamProcessor.Phase.REPLAY);
    }

    @Test
    public void shouldReplayMoreAfterResumed() {
        StreamProcessor startStreamProcessor = startStreamProcessor(this.replayContinuously);
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        ConditionFactory await = Awaitility.await("should have replayed first events");
        StreamProcessorRule streamProcessorRule = this.replayContinuously;
        Objects.requireNonNull(streamProcessorRule);
        await.until(streamProcessorRule::getLastSuccessfulProcessedRecordPosition, l -> {
            return l.longValue() > 0;
        });
        startStreamProcessor.pauseProcessing().join();
        this.replayContinuously.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        startStreamProcessor.resumeProcessing();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.typedRecordProcessor, this.eventApplier});
        ((EventApplier) inOrder.verify(this.eventApplier, TIMEOUT.times(2))).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
        Assertions.assertThat(getCurrentPhase(this.replayContinuously)).isEqualTo(StreamProcessor.Phase.REPLAY);
    }

    @Test
    public void shouldUpdateLastProcessedAndWrittenPositionOnReplay() {
        startStreamProcessor(this.replayContinuously);
        long writeCommand = this.replayContinuously.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        long writeEvent = this.replayContinuously.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.sourceRecordPosition(writeCommand);
        });
        ((EventApplier) Mockito.verify(this.eventApplier, TIMEOUT)).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        Awaitility.await().untilAsserted(() -> {
            Long lastProcessedPosition = getLastProcessedPosition(this.replayContinuously);
            Long lastWrittenPosition = getLastWrittenPosition(this.replayContinuously);
            ((AbstractLongAssert) Assertions.assertThat(lastProcessedPosition).describedAs("Expected the position of the command to be the last processed position", new Object[0])).isEqualTo(writeCommand);
            ((AbstractLongAssert) Assertions.assertThat(lastWrittenPosition).describedAs("Expected the position of the event to be the last written position", new Object[0])).isEqualTo(writeEvent);
        });
    }

    @Test
    public void shouldSetLastProcessedPositionOnStateToSourcePosition() {
        startStreamProcessor(this.replayContinuously);
        long writeCommand = this.replayContinuously.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.replayContinuously.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.sourceRecordPosition(writeCommand);
        });
        ((EventApplier) Mockito.verify(this.eventApplier, TIMEOUT)).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        Awaitility.await().until(() -> {
            return getLastProcessedPosition(this.replayContinuously);
        }, Predicate.isEqual(Long.valueOf(writeCommand)));
        ((AbstractLongAssert) Assertions.assertThat(this.replayContinuously.getLastSuccessfulProcessedRecordPosition()).describedAs("Last processed position in the state must be the last source position", new Object[0])).isEqualTo(writeCommand);
    }

    @Test
    public void shouldNotSetLastProcessedPositionIfLessThanSnapshotPosition() {
        startStreamProcessor(this.replayContinuously);
        this.replayContinuously.getLastProcessedPositionState().markAsProcessed(2L);
        this.replayContinuously.snapshot();
        this.replayContinuously.closeStreamProcessor();
        startStreamProcessor(this.replayContinuously);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(getCurrentPhase(this.replayContinuously)).isEqualTo(StreamProcessor.Phase.REPLAY);
        });
        MutableLastProcessedPositionState lastProcessedPositionState = this.replayContinuously.getLastProcessedPositionState();
        Awaitility.await().untilAsserted(() -> {
            ((AbstractLongAssert) Assertions.assertThat(lastProcessedPositionState.getLastSuccessfulProcessedRecordPosition()).describedAs("Expected that the last processed position is not less than the snapshot position", new Object[0])).isEqualTo(2L);
        });
    }

    private StreamProcessor startStreamProcessor(StreamProcessorRule streamProcessorRule) {
        return streamProcessorRule.withEventApplierFactory(mutableProcessingState -> {
            return this.eventApplier;
        }).startTypedStreamProcessorNotAwaitOpening((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, this.typedRecordProcessor);
        });
    }

    private void startWithPausedStreamProcessor(StreamProcessorRule streamProcessorRule) {
        startStreamProcessor(streamProcessorRule).pauseProcessing().join();
    }

    private StreamProcessor.Phase getCurrentPhase(StreamProcessorRule streamProcessorRule) {
        return (StreamProcessor.Phase) getStreamProcessor(streamProcessorRule).getCurrentPhase().join();
    }

    private Long getLastProcessedPosition(StreamProcessorRule streamProcessorRule) {
        return (Long) getStreamProcessor(streamProcessorRule).getLastProcessedPositionAsync().join();
    }

    private Long getLastWrittenPosition(StreamProcessorRule streamProcessorRule) {
        return (Long) getStreamProcessor(streamProcessorRule).getLastWrittenPositionAsync().join();
    }

    private StreamProcessor getStreamProcessor(StreamProcessorRule streamProcessorRule) {
        return streamProcessorRule.getStreamProcessor(1);
    }
}
