package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.api.EmptyProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamPlatform;
import io.camunda.zeebe.engine.util.StreamPlatformExtension;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

@ExtendWith({StreamPlatformExtension.class})
/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorMultipleProcessorsTest.class */
final class StreamProcessorMultipleProcessorsTest {
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);
    private static final ProcessInstanceRecord RECORD = Records.processInstance(1);
    private static final JobRecord JOB_RECORD = Records.job(1);
    private StreamPlatform streamPlatform;

    StreamProcessorMultipleProcessorsTest() {
    }

    @Test
    void shouldChooseTheRightProcessorToProcess() {
        RecordProcessor createRecordProcessorFor = createRecordProcessorFor(ValueType.PROCESS_INSTANCE);
        RecordProcessor createRecordProcessorFor2 = createRecordProcessorFor(ValueType.JOB);
        this.streamPlatform.withRecordProcessors(List.of(createRecordProcessorFor, createRecordProcessorFor2)).startStreamProcessor();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD).causedBy(0));
        ((RecordProcessor) Mockito.verify(createRecordProcessorFor, TIMEOUT)).process(matches(ValueType.PROCESS_INSTANCE), (ProcessingResultBuilder) ArgumentMatchers.any());
        this.streamPlatform.writeBatch(RecordToWrite.command().job(JobIntent.COMPLETE, JOB_RECORD).causedBy(0));
        ((RecordProcessor) Mockito.verify(createRecordProcessorFor2, TIMEOUT)).process(matches(ValueType.JOB), (ProcessingResultBuilder) ArgumentMatchers.any());
    }

    private TypedRecord<?> matches(ValueType valueType) {
        return (TypedRecord) Mockito.argThat(typedRecord -> {
            return typedRecord.getValueType().equals(valueType);
        });
    }

    @Test
    void shouldChooseTheRightProcessorToReplay() {
        RecordProcessor createRecordProcessorFor = createRecordProcessorFor(ValueType.PROCESS_INSTANCE);
        RecordProcessor createRecordProcessorFor2 = createRecordProcessorFor(ValueType.JOB);
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        this.streamPlatform.withRecordProcessors(List.of(createRecordProcessorFor, createRecordProcessorFor2)).startStreamProcessor();
        ((RecordProcessor) Mockito.verify(createRecordProcessorFor, TIMEOUT)).replay(matches(ValueType.PROCESS_INSTANCE));
        ((RecordProcessor) Mockito.verify(createRecordProcessorFor2, Mockito.never())).replay((TypedRecord) ArgumentMatchers.any());
    }

    private RecordProcessor createRecordProcessorFor(ValueType valueType) {
        RecordProcessor recordProcessor = (RecordProcessor) Mockito.mock(RecordProcessor.class);
        Mockito.when(recordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(EmptyProcessingResult.INSTANCE);
        Mockito.when(recordProcessor.onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(EmptyProcessingResult.INSTANCE);
        Mockito.when(Boolean.valueOf(recordProcessor.accepts(valueType))).thenReturn(true);
        return recordProcessor;
    }
}
