package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.EngineConfiguration;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnBehaviors;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.engine.util.TypedRecordStream;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.stream.api.ProcessingResultBuilder;
import io.camunda.zeebe.test.util.MsgPackUtil;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.FeatureFlags;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/MessageStreamProcessorTest.class */
public final class MessageStreamProcessorTest {
    private static final EngineConfiguration DEFAULT_ENGINE_CONFIGURATION = new EngineConfiguration();
    private static final String DEFAULT_TENANT = "<default>";

    @Rule
    public final StreamProcessorRule rule = new StreamProcessorRule();
    private SubscriptionCommandSender spySubscriptionCommandSender;
    private InterPartitionCommandSender mockInterpartitionCommandSender;

    @Before
    public void setup() {
        this.mockInterpartitionCommandSender = (InterPartitionCommandSender) Mockito.mock(InterPartitionCommandSender.class);
        ProcessingResultBuilder processingResultBuilder = (ProcessingResultBuilder) Mockito.mock(ProcessingResultBuilder.class);
        Writers writers = new Writers(() -> {
            return processingResultBuilder;
        }, (EventAppliers) Mockito.mock(EventAppliers.class));
        this.spySubscriptionCommandSender = (SubscriptionCommandSender) Mockito.spy(new SubscriptionCommandSender(1, this.mockInterpartitionCommandSender));
        this.spySubscriptionCommandSender.setWriters(writers);
        this.rule.startTypedStreamProcessor((typedRecordProcessors, typedRecordProcessorContext) -> {
            MessageEventProcessors.addMessageProcessors((BpmnBehaviors) Mockito.mock(BpmnBehaviors.class), typedRecordProcessors, typedRecordProcessorContext.getProcessingState(), typedRecordProcessorContext.getScheduledTaskStateFactory(), this.spySubscriptionCommandSender, typedRecordProcessorContext.getWriters(), DEFAULT_ENGINE_CONFIGURATION, FeatureFlags.createDefault());
            return typedRecordProcessors;
        });
    }

    @Test
    public void shouldRejectDuplicatedOpenMessageSubscription() {
        UnifiedRecordValue messageSubscription = messageSubscription();
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        Record<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getIntent()).isEqualTo(MessageSubscriptionIntent.CREATE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getRejectionType()).isEqualTo(RejectionType.INVALID_STATE);
        ((SubscriptionCommandSender) Mockito.verify(this.spySubscriptionCommandSender, Mockito.timeout(5000L).times(2))).openProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), (String) ArgumentMatchers.eq(DEFAULT_TENANT));
    }

    @Test
    public void shouldRetryToCorrelateMessageSubscriptionAfterPublishedMessage() {
        UnifiedRecordValue messageSubscription = messageSubscription();
        UnifiedRecordValue message = message();
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).exists();
        });
        Awaitility.await("retry correlation").untilAsserted(() -> {
            this.rule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
            ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender, Mockito.timeout(100L).times(2))).sendCommand(ArgumentMatchers.eq(0), (ValueType) ArgumentMatchers.eq(ValueType.PROCESS_MESSAGE_SUBSCRIPTION), (Intent) ArgumentMatchers.eq(ProcessMessageSubscriptionIntent.CORRELATE), (UnifiedRecordValue) ArgumentMatchers.any());
        });
    }

    @Test
    public void shouldRetryToCorrelateMessageSubscriptionAfterOpenedSubscription() {
        UnifiedRecordValue messageSubscription = messageSubscription();
        this.rule.writeCommand(MessageIntent.PUBLISH, message());
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        Awaitility.await("retry correlation").untilAsserted(() -> {
            this.rule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
            ((InterPartitionCommandSender) Mockito.verify(this.mockInterpartitionCommandSender, Mockito.timeout(100L).times(2))).sendCommand(ArgumentMatchers.eq(0), (ValueType) ArgumentMatchers.eq(ValueType.PROCESS_MESSAGE_SUBSCRIPTION), (Intent) ArgumentMatchers.eq(ProcessMessageSubscriptionIntent.CORRELATE), (UnifiedRecordValue) ArgumentMatchers.any());
        });
    }

    @Test
    public void shouldRejectCorrelateIfMessageSubscriptionClosed() {
        UnifiedRecordValue messageSubscription = messageSubscription();
        this.rule.writeCommand(MessageIntent.PUBLISH, message());
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.DELETE, messageSubscription);
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, messageSubscription);
        Record<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getIntent()).isEqualTo(MessageSubscriptionIntent.CORRELATE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
    }

    @Test
    public void shouldRejectDuplicatedCloseMessageSubscription() {
        UnifiedRecordValue messageSubscription = messageSubscription();
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.DELETE, messageSubscription);
        this.rule.writeCommand(MessageSubscriptionIntent.DELETE, messageSubscription);
        Record<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getIntent()).isEqualTo(MessageSubscriptionIntent.DELETE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
        ((SubscriptionCommandSender) Mockito.verify(this.spySubscriptionCommandSender, Mockito.timeout(5000L).times(2))).closeProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class), (String) ArgumentMatchers.eq(DEFAULT_TENANT));
    }

    @Test
    public void shouldNotCorrelateNewMessagesIfSubscriptionNotCorrelatable() {
        UnifiedRecordValue messageSubscription = messageSubscription();
        UnifiedRecordValue message = message();
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).exists();
        });
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        ((SubscriptionCommandSender) Mockito.verify(this.spySubscriptionCommandSender, Mockito.timeout(5000L).times(1))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.eq(((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey()), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any(), (String) ArgumentMatchers.eq(DEFAULT_TENANT));
    }

    @Test
    public void shouldCorrelateNewMessagesIfSubscriptionIsReusable() {
        UnifiedRecordValue messageSubscription = messageSubscription();
        UnifiedRecordValue message = message();
        messageSubscription.setInterrupting(false);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        TestUtil.waitUntil(() -> {
            return ((TypedRecordStream) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).limit(2L)).count() == 2;
        });
        long key = ((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey();
        long key2 = ((Record) ((TypedRecordStream) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).skip(1L)).getFirst()).getKey();
        ((SubscriptionCommandSender) Mockito.verify(this.spySubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(messageSubscription.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.eq(key), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any(), (String) ArgumentMatchers.eq(DEFAULT_TENANT));
        ((SubscriptionCommandSender) Mockito.verify(this.spySubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(messageSubscription.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.eq(key2), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any(), (String) ArgumentMatchers.eq(DEFAULT_TENANT));
    }

    @Test
    public void shouldCorrelateMultipleMessagesOneBeforeOpenOneAfter() {
        UnifiedRecordValue interrupting = messageSubscription().setInterrupting(false);
        UnifiedRecordValue variables = message().setVariables(MsgPackUtil.asMsgPack("foo", "bar"));
        UnifiedRecordValue variables2 = message().setVariables(MsgPackUtil.asMsgPack("foo", "baz"));
        this.rule.writeCommand(MessageIntent.PUBLISH, variables);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, interrupting);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, interrupting);
        this.rule.writeCommand(MessageIntent.PUBLISH, variables2);
        assertAllMessagesReceived(interrupting);
    }

    @Test
    public void shouldCorrelateMultipleMessagesTwoBeforeOpen() {
        UnifiedRecordValue interrupting = messageSubscription().setInterrupting(false);
        UnifiedRecordValue variables = message().setVariables(MsgPackUtil.asMsgPack("foo", "bar"));
        UnifiedRecordValue variables2 = message().setVariables(MsgPackUtil.asMsgPack("foo", "baz"));
        this.rule.writeCommand(MessageIntent.PUBLISH, variables);
        this.rule.writeCommand(MessageIntent.PUBLISH, variables2);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, interrupting);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, interrupting);
        assertAllMessagesReceived(interrupting);
    }

    @Test
    public void shouldCorrelateToFirstSubscriptionAfterRejection() {
        UnifiedRecordValue message = message();
        UnifiedRecordValue elementInstanceKey = messageSubscription().setElementInstanceKey(5L);
        UnifiedRecordValue elementInstanceKey2 = messageSubscription().setElementInstanceKey(10L);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, elementInstanceKey);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, elementInstanceKey2);
        TestUtil.waitUntil(() -> {
            return ((TypedRecordStream) this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).filter(record -> {
                return record.getValue().getElementInstanceKey() == elementInstanceKey2.getElementInstanceKey();
            })).exists();
        });
        long key = ((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey();
        elementInstanceKey.setMessageKey(key);
        this.rule.writeCommand(MessageSubscriptionIntent.REJECT, elementInstanceKey);
        ((SubscriptionCommandSender) Mockito.verify(this.spySubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(elementInstanceKey.getProcessInstanceKey()), ArgumentMatchers.eq(elementInstanceKey.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(elementInstanceKey.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class), ArgumentMatchers.eq(key), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class), (DirectBuffer) ArgumentMatchers.eq(elementInstanceKey.getCorrelationKeyBuffer()), (String) ArgumentMatchers.eq(DEFAULT_TENANT));
        ((SubscriptionCommandSender) Mockito.verify(this.spySubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(elementInstanceKey2.getProcessInstanceKey()), ArgumentMatchers.eq(elementInstanceKey2.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(elementInstanceKey2.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class), ArgumentMatchers.eq(key), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class), (DirectBuffer) ArgumentMatchers.eq(elementInstanceKey2.getCorrelationKeyBuffer()), (String) ArgumentMatchers.eq(DEFAULT_TENANT));
    }

    private void assertAllMessagesReceived(MessageSubscriptionRecord messageSubscriptionRecord) {
        TestUtil.waitUntil(() -> {
            return ((TypedRecordStream) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).limit(2L)).count() == 2;
        });
        long key = ((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey();
        long key2 = ((Record) ((TypedRecordStream) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).skip(1L)).getFirst()).getKey();
        ((SubscriptionCommandSender) Mockito.verify(this.spySubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscriptionRecord.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscriptionRecord.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getMessageNameBuffer()), ArgumentMatchers.eq(key), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getCorrelationKeyBuffer()), (String) ArgumentMatchers.eq(DEFAULT_TENANT));
        ((SubscriptionCommandSender) Mockito.verify(this.spySubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscriptionRecord.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscriptionRecord.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getMessageNameBuffer()), ArgumentMatchers.eq(key2), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getCorrelationKeyBuffer()), (String) ArgumentMatchers.eq(DEFAULT_TENANT));
    }

    private MessageSubscriptionRecord messageSubscription() {
        MessageSubscriptionRecord messageSubscriptionRecord = new MessageSubscriptionRecord();
        messageSubscriptionRecord.setProcessInstanceKey(1L).setElementInstanceKey(2L).setBpmnProcessId(BufferUtil.wrapString("process")).setMessageKey(-1L).setMessageName(BufferUtil.wrapString("order canceled")).setCorrelationKey(BufferUtil.wrapString("order-123")).setInterrupting(true);
        return messageSubscriptionRecord;
    }

    private MessageRecord message() {
        MessageRecord messageRecord = new MessageRecord();
        messageRecord.setName(BufferUtil.wrapString("order canceled")).setCorrelationKey(BufferUtil.wrapString("order-123")).setTimeToLive(Duration.ofSeconds(10L).toMillis()).setVariables(MsgPackUtil.asMsgPack("orderId", "order-123"));
        return messageRecord;
    }

    private Record<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection() {
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().onlyRejections().findFirst().isPresent();
        });
        return (Record) this.rule.events().onlyMessageSubscriptionRecords().onlyRejections().findFirst().get();
    }
}
