package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.util.RecordStream;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamProcessingComposite;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import java.util.Objects;
import org.agrona.CloseHelper;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorInconsistentPositionTest.class */
public final class StreamProcessorInconsistentPositionTest {
    private static final ProcessInstanceRecord PROCESS_INSTANCE_RECORD = Records.processInstance(1);
    private final TemporaryFolder tempFolder = new TemporaryFolder();
    private final AutoCloseableRule closeables = new AutoCloseableRule();
    private final ControlledActorClock clock = new ControlledActorClock();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(this.clock);

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.tempFolder).around(this.actorSchedulerRule).around(this.closeables);
    private StreamProcessingComposite firstStreamProcessorComposite;
    private StreamProcessingComposite secondStreamProcessorComposite;
    private TestStreams testStreams;

    @Before
    public void setup() {
        this.testStreams = new TestStreams(this.tempFolder, this.closeables, this.actorSchedulerRule.get());
        ListLogStorage listLogStorage = new ListLogStorage();
        this.testStreams.createLogStream(StreamProcessingComposite.getLogName(1), 1, listLogStorage);
        this.testStreams.createLogStream(StreamProcessingComposite.getLogName(2), 2, listLogStorage);
        this.firstStreamProcessorComposite = new StreamProcessingComposite(this.testStreams, 1, DefaultZeebeDbFactory.defaultFactory(), this.actorSchedulerRule.get());
        this.secondStreamProcessorComposite = new StreamProcessingComposite(this.testStreams, 2, DefaultZeebeDbFactory.defaultFactory(), this.actorSchedulerRule.get());
    }

    @After
    public void tearDown() {
        CloseHelper.quietClose(() -> {
            this.testStreams.closeProcessor(StreamProcessingComposite.getLogName(1));
        });
    }

    @Test
    public void shouldNotStartOnInconsistentLog() {
        long writeCommand = this.firstStreamProcessorComposite.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, PROCESS_INSTANCE_RECORD);
        long writeCommand2 = this.firstStreamProcessorComposite.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> {
            return new RecordStream(this.testStreams.events(StreamProcessingComposite.getLogName(1))).onlyProcessInstanceRecords().withIntent(ProcessInstanceIntent.ACTIVATE_ELEMENT).count() == 2;
        });
        long writeCommand3 = this.secondStreamProcessorComposite.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, PROCESS_INSTANCE_RECORD);
        long writeCommand4 = this.secondStreamProcessorComposite.writeCommand(ProcessInstanceIntent.ACTIVATE_ELEMENT, PROCESS_INSTANCE_RECORD);
        TestUtil.waitUntil(() -> {
            return new RecordStream(this.testStreams.events(StreamProcessingComposite.getLogName(2))).onlyProcessInstanceRecords().withIntent(ProcessInstanceIntent.ACTIVATE_ELEMENT).count() == 4;
        });
        Assertions.assertThat(writeCommand).isEqualTo(writeCommand3);
        Assertions.assertThat(writeCommand2).isEqualTo(writeCommand4);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        StreamProcessor startTypedStreamProcessorNotAwaitOpening = this.firstStreamProcessorComposite.startTypedStreamProcessorNotAwaitOpening((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onCommand(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ACTIVATE_ELEMENT, typedRecordProcessor);
        });
        Objects.requireNonNull(startTypedStreamProcessorNotAwaitOpening);
        TestUtil.waitUntil(startTypedStreamProcessorNotAwaitOpening::isFailed);
        Assertions.assertThat(startTypedStreamProcessorNotAwaitOpening.isFailed()).isTrue();
    }
}
