package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.verification.VerificationWithTimeout;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReplayTest.class */
public final class StreamProcessorReplayTest {
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);
    private static final ProcessInstanceRecord RECORD = Records.processInstance(1);

    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();

    @Rule
    public MockitoRule mockitoRule = MockitoJUnit.rule();

    @Mock
    private TypedRecordProcessor<?> typedRecordProcessor;

    @Mock
    private EventApplier eventApplier;

    @Test
    public void shouldReplayEvents() {
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        ((EventApplier) Mockito.verify(this.eventApplier, TIMEOUT)).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ELEMENT_ACTIVATING), (RecordValue) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.eventApplier});
        Mockito.verifyNoInteractions(new Object[]{this.typedRecordProcessor});
    }

    @Test
    public void shouldSkipCommands() {
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        ((EventApplier) Mockito.verify(this.eventApplier, Mockito.never())).applyState(ArgumentMatchers.anyLong(), (Intent) ArgumentMatchers.eq(ProcessInstanceIntent.ACTIVATE_ELEMENT), (RecordValue) ArgumentMatchers.any());
        Mockito.verifyNoInteractions(new Object[]{this.typedRecordProcessor});
    }

    @Test
    public void shouldSkipRejections() {
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.rejection().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD).causedBy(0));
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        Mockito.verifyNoInteractions(new Object[]{this.typedRecordProcessor, this.eventApplier});
    }

    @Test
    public void shouldNotReplayEventIfAlreadyApplied() {
        this.streamProcessorRule.withEventApplierFactory(mutableProcessingState -> {
            return this.eventApplier;
        }).startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorReplayTest.1
                public void processRecord(TypedRecord<UnifiedRecordValue> typedRecord) {
                    typedRecordProcessorContext.getWriters().sideEffect().appendSideEffect(() -> {
                        return true;
                    });
                }
            });
        });
        long writeCommand = this.streamProcessorRule.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
        this.streamProcessorRule.writeEvent((Intent) ProcessInstanceIntent.ELEMENT_ACTIVATING, (UnpackedObject) RECORD, fluentLogWriter -> {
            return fluentLogWriter.key(1L).sourceRecordPosition(writeCommand);
        });
        awaitUntilProcessed(writeCommand);
        this.streamProcessorRule.snapshot();
        this.streamProcessorRule.closeStreamProcessor();
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().key(2L).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.eventApplier});
        ((EventApplier) inOrder.verify(this.eventApplier, Mockito.never())).applyState(ArgumentMatchers.eq(1L), (Intent) ArgumentMatchers.any(), (RecordValue) ArgumentMatchers.any());
        ((EventApplier) inOrder.verify(this.eventApplier, TIMEOUT)).applyState(ArgumentMatchers.eq(2L), (Intent) ArgumentMatchers.any(), (RecordValue) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldRestoreKeyGenerator() {
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().key(2L).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().key(1L).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(2));
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        Assertions.assertThat(this.streamProcessorRule.getProcessingState().getKeyGenerator().nextKey()).isEqualTo(3L);
    }

    @Test
    public void shouldIgnoreKeysFromDifferentPartition() {
        long encodePartitionId = Protocol.encodePartitionId(0, 1L);
        this.streamProcessorRule.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().key(encodePartitionId).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0), RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().key(Protocol.encodePartitionId(1, 2L)).processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(2));
        startStreamProcessor(this.typedRecordProcessor, this.eventApplier);
        Assertions.assertThat(this.streamProcessorRule.getProcessingState().getKeyGenerator().nextKey()).isEqualTo(encodePartitionId + 1);
    }

    private void startStreamProcessor(TypedRecordProcessor<?> typedRecordProcessor, EventApplier eventApplier) {
        this.streamProcessorRule.withEventApplierFactory(mutableProcessingState -> {
            return eventApplier;
        }).startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            return typedRecordProcessors.onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor);
        });
    }

    private void awaitUntilProcessed(long j) {
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat((Long) this.streamProcessorRule.getStreamProcessor(0).getLastProcessedPositionAsync().join()).isEqualTo(j);
        });
    }
}
