package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamPlatform;
import io.camunda.zeebe.engine.util.StreamPlatformExtension;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import org.assertj.core.api.AssertionsForClassTypes;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

@ExtendWith({StreamPlatformExtension.class})
/* loaded from: input_file:io/camunda/zeebe/streamprocessor/StreamProcessorErrorHandlingTest.class */
class StreamProcessorErrorHandlingTest {
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);
    private StreamPlatform streamPlatform;

    StreamProcessorErrorHandlingTest() {
    }

    @Test
    void shouldRejectUserCommandIfProcessingErrorHandlingFailed() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new RuntimeException()});
        Mockito.when(defaultMockedRecordProcessor.onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new RuntimeException()});
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.userCommand().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(1))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(1))).onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        LogStreamReader newLogStreamReader = this.streamPlatform.getLogStream().newLogStreamReader();
        newLogStreamReader.seekToFirstEvent();
        newLogStreamReader.next();
        Awaitility.await("should write rejection to log").untilAsserted(() -> {
            AssertionsForClassTypes.assertThat(newLogStreamReader.hasNext()).isTrue();
        });
        LoggedEvent loggedEvent = (LoggedEvent) newLogStreamReader.next();
        RecordMetadata recordMetadata = new RecordMetadata();
        loggedEvent.readMetadata(recordMetadata);
        AssertionsForClassTypes.assertThat(recordMetadata.getRecordType()).isEqualTo(RecordType.COMMAND_REJECTION);
        AssertionsForClassTypes.assertThat(loggedEvent.getSourceEventPosition()).isEqualTo(1L);
    }

    @Test
    void shouldContinueProcessingEvenIfErrorHandlingFailedForUserCommand() {
        RecordProcessor defaultMockedRecordProcessor = this.streamPlatform.getDefaultMockedRecordProcessor();
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((num, num2) -> {
            return true;
        });
        bufferedProcessingResultBuilder.appendRecordReturnEither(1L, RecordType.EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED, RejectionType.NULL_VAL, "", Records.processInstance(1L));
        Mockito.when(defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new RuntimeException()}).thenReturn(bufferedProcessingResultBuilder.build());
        Mockito.when(defaultMockedRecordProcessor.onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new RuntimeException()});
        this.streamPlatform.startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.userCommand().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(1))).onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        long writeBatch = this.streamPlatform.writeBatch(RecordToWrite.userCommand().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1L)));
        ((RecordProcessor) Mockito.verify(defaultMockedRecordProcessor, TIMEOUT.times(2))).process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any());
        LogStreamReader newLogStreamReader = this.streamPlatform.getLogStream().newLogStreamReader();
        newLogStreamReader.seekToFirstEvent();
        newLogStreamReader.next();
        newLogStreamReader.next();
        newLogStreamReader.next();
        Awaitility.await("should write follow up event of second command to log").untilAsserted(() -> {
            AssertionsForClassTypes.assertThat(newLogStreamReader.hasNext()).isTrue();
        });
        LoggedEvent loggedEvent = (LoggedEvent) newLogStreamReader.next();
        RecordMetadata recordMetadata = new RecordMetadata();
        loggedEvent.readMetadata(recordMetadata);
        AssertionsForClassTypes.assertThat(recordMetadata.getRecordType()).isEqualTo(RecordType.EVENT);
        AssertionsForClassTypes.assertThat(recordMetadata.getIntent()).isEqualTo(ProcessInstanceIntent.ELEMENT_ACTIVATED);
        AssertionsForClassTypes.assertThat(loggedEvent.getSourceEventPosition()).isEqualTo(writeBatch);
    }
}
