package io.camunda.zeebe.engine.processing.bpmn.compensation;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.CompensationSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.VariableIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.BpmnEventType;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/bpmn/compensation/CompensationEventExecutionTest.class */
public class CompensationEventExecutionTest {

    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String PROCESS_ID = "compensation-process";
    private static final String SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY = "compensableActivity";
    private static final String SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2 = "compensableActivity2";
    private static final String SERVICE_TASK_TYPE_COMPENSATION_HANDLER = "compensationHandler";
    private static final String SERVICE_TASK_TYPE_COMPENSATION_HANDLER2 = "compensationHandler2";

    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    @Test
    public void shouldExecuteAProcessWithCompensationIntermediateEvent() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().userTask().intermediateThrowEvent("compensation-event", intermediateThrowEventBuilder -> {
            intermediateThrowEventBuilder.compensateEventDefinition().compensateEventDefinitionDone();
        }).endEvent().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("io.camunda.zeebe:userTask").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, (v0) -> {
            return v0.getIntent();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.USER_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.UNSPECIFIED}), Assertions.tuple(new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED, BpmnEventType.COMPENSATION}), Assertions.tuple(new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.COMPENSATION}), Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.NONE}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.UNSPECIFIED})});
    }

    @Test
    public void shouldExecuteAProcessWithCompensationEndEvent() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().userTask().endEvent("compensation-event", endEventBuilder -> {
            endEventBuilder.compensateEventDefinition().compensateEventDefinitionDone();
        }).done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("io.camunda.zeebe:userTask").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, (v0) -> {
            return v0.getIntent();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.USER_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.UNSPECIFIED}), Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED, BpmnEventType.COMPENSATION}), Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.COMPENSATION}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED, BpmnEventType.UNSPECIFIED})});
    }

    @Test
    public void shouldCreateAndUpdateCompensationSubscriptionForCompletedTask() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-throw-event.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Record record = (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(create).withElementId("ActivityToCompensate").getFirst();
        Record record2 = (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(create).withElementType(BpmnElementType.INTERMEDIATE_THROW_EVENT).withEventType(BpmnEventType.COMPENSATION).getFirst();
        Record record3 = (Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(create).withElementId("CompensationHandler").getFirst();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(3L)).extracting((v0) -> {
            return v0.getValue();
        }).extracting(new Function[]{(v0) -> {
            return v0.getTenantId();
        }, (v0) -> {
            return v0.getProcessInstanceKey();
        }, (v0) -> {
            return v0.getProcessDefinitionKey();
        }, (v0) -> {
            return v0.getCompensableActivityId();
        }, (v0) -> {
            return v0.getCompensableActivityInstanceKey();
        }, (v0) -> {
            return v0.getCompensableActivityScopeKey();
        }, (v0) -> {
            return v0.getCompensationHandlerId();
        }}).containsOnly(new Tuple[]{Assertions.tuple(new Object[]{record.getValue().getTenantId(), Long.valueOf(create), Long.valueOf(record.getValue().getProcessDefinitionKey()), "ActivityToCompensate", Long.valueOf(record.getKey()), Long.valueOf(record.getValue().getFlowScopeKey()), "CompensationHandler"})});
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(3L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record4 -> {
            return record4.getValue().getThrowEventId();
        }, record5 -> {
            return Long.valueOf(record5.getValue().getThrowEventInstanceKey());
        }, record6 -> {
            return Long.valueOf(record6.getValue().getCompensationHandlerInstanceKey());
        }}).containsSequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.CREATED, "", -1L, -1L}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationThrowEvent", Long.valueOf(record2.getKey()), Long.valueOf(record3.getKey())}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationThrowEvent", Long.valueOf(record2.getKey()), Long.valueOf(record3.getKey())})});
    }

    @Test
    public void shouldActivateAndCompleteCompensationHandlerForIntermediateThrowEvent() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-throw-event.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }, (v0) -> {
            return v0.getIntent();
        }, record3 -> {
            return record3.getValue().getElementId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, "ActivityToCompensate"}), Assertions.tuple(new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationThrowEvent"}), Assertions.tuple(new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATING, "CompensationBoundaryEvent"}), Assertions.tuple(new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationBoundaryEvent"}), Assertions.tuple(new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETING, "CompensationBoundaryEvent"}), Assertions.tuple(new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationBoundaryEvent"}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationHandler"}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler"}), Assertions.tuple(new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationThrowEvent"}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, PROCESS_ID})});
    }

    @Test
    public void shouldActivateAndCompleteCompensationHandlerForEndEvent() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-end-event.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }, (v0) -> {
            return v0.getIntent();
        }, record3 -> {
            return record3.getValue().getElementId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, "ActivityToCompensate"}), Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationEndEvent"}), Assertions.tuple(new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATING, "CompensationBoundaryEvent"}), Assertions.tuple(new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationBoundaryEvent"}), Assertions.tuple(new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETING, "CompensationBoundaryEvent"}), Assertions.tuple(new Object[]{BpmnElementType.BOUNDARY_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationBoundaryEvent"}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationHandler"}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler"}), Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationEndEvent"}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, PROCESS_ID})});
    }

    @Test
    public void shouldActivateAndCompleteMultipleCompensationHandlerForThrowEvent() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/multiple-compensation-throw-event.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }, (v0) -> {
            return v0.getIntent();
        }, record3 -> {
            return record3.getValue().getElementId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationThrowEvent"}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler"}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler2"}), Assertions.tuple(new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationThrowEvent"}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, PROCESS_ID})});
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(6L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record4 -> {
            return record4.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler2"})});
    }

    @Test
    public void shouldActivateAndCompleteMultipleCompensationHandlerForEndEvent() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/multiple-compensation-end-event.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }, (v0) -> {
            return v0.getIntent();
        }, record3 -> {
            return record3.getValue().getElementId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationEndEvent"}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler"}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationHandler2"}), Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED, "CompensationEndEvent"}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED, PROCESS_ID})});
    }

    @Test
    public void shouldTerminateCompensationHandler() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-throw-event.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.processInstance().withInstanceKey(create).cancel();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceTerminated()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }, (v0) -> {
            return v0.getIntent();
        }, record3 -> {
            return record3.getValue().getElementId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED, "CompensationThrowEvent"}), Assertions.tuple(new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_TERMINATED, "CompensationThrowEvent"}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_TERMINATED, "CompensationHandler"}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_TERMINATED, PROCESS_ID})});
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(3L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record4 -> {
            return record4.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.DELETED, "CompensationHandler"})});
    }

    @Test
    public void shouldDeleteAllSubscriptionsWhenProcessIsCompletedWithoutTriggerCompensationHandler() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-no-throw-event.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("io.camunda.zeebe:userTask").complete();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(2L)).extracting(new Function[]{(v0) -> {
            return v0.getValueType();
        }, (v0) -> {
            return v0.getIntent();
        }}).contains(new Tuple[]{Assertions.tuple(new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.DELETED})});
    }

    @Test
    public void shouldDeleteAllSubscriptionsWhenProcessIsTerminatedWithoutTriggerCompensationHandler() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-no-throw-event-terminate.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("io.camunda.zeebe:userTask").complete();
        ENGINE.processInstance().withInstanceKey(create).cancel();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(2L)).extracting(new Function[]{(v0) -> {
            return v0.getValueType();
        }, (v0) -> {
            return v0.getIntent();
        }}).contains(new Tuple[]{Assertions.tuple(new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.DELETED})});
    }

    @Test
    public void shouldTriggerCompensationHandlerInSubprocesses() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-embedded-subprocess.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(8L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record -> {
            return record.getValue().getCompensationHandlerId();
        }}).contains(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler2"})});
    }

    @Test
    public void shouldNotTriggerCompensationIfSubprocessIsNotCompleted() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-single-embedded-subprocess.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType("completableActivity").complete();
        ENGINE.job().ofInstance(create).withType("NotActivableTask").complete();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(4L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record -> {
            return record.getValue().getCompensableActivityId();
        }}).contains(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.DELETED, "embedded-subprocess"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.DELETED, "ActivityToCompensate"})});
    }

    @Test
    public void shouldNotTriggerCompensationOnParentScope() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-embedded-subprocess-parent.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(5L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record -> {
            return record.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler2"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.DELETED, "CompensationHandler"})});
    }

    @Test
    public void shouldNotCreateSubprocessSubscriptionWithoutChildSubscription() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/subprocess-after-compensation-activity.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType("B").complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{BpmnElementType.INTERMEDIATE_THROW_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldInvokeCompensationHandlerTheSameAmountAsExecuted() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-multi-instance.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        completeJobs(create, SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY, 3);
        completeJobs(create, SERVICE_TASK_TYPE_COMPENSATION_HANDLER, 3);
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(20L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record -> {
            return record.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record2 -> {
            return record2.getValue().getBpmnElementType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldTriggerOnlyCorrectHandlerForMultiInstance() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-throw-error.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        completeJobs(create, SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY, 3);
        AtomicInteger atomicInteger = new AtomicInteger(1);
        ENGINE.jobs().withType("activity").activate().getValue().getJobKeys().forEach(l -> {
            if (atomicInteger.get() == 2) {
                ENGINE.job().withKey(l.longValue()).throwError();
            } else {
                ENGINE.job().withKey(l.longValue()).complete();
            }
            atomicInteger.getAndIncrement();
        });
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(8L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record -> {
            return record.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record2 -> {
            return record2.getValue().getBpmnElementType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldTriggerCompensationHandlerOnlyOnce() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-multi-throw-event.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(6L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record2 -> {
            return record2.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, ""}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
    }

    @Test
    public void shouldCompleteSubprocessAfterAllCompensationHandlerAreCompleted() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-subprocess-multi-handler.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY2).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER2).complete();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(9L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record -> {
            return record.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler2"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, ""})});
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record2 -> {
            return record2.getValue().getBpmnElementType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerCompensationHandlerIfTheParentSubprocessIsNotCompleted() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-multi-subprocess.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).complete();
        ENGINE.job().ofInstance(create).withType("activity2").complete();
        ENGINE.job().ofInstance(create).withType("activity").complete();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(6L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record -> {
            return record.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.CREATED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.DELETED, "CompensationHandler"})});
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record2 -> {
            return record2.getValue().getBpmnElementType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerHandlersForMultiInstanceInsideNotCompletedSubprocess() {
        Consumer consumer = subProcessBuilder -> {
            subProcessBuilder.multiInstance(multiInstanceLoopCharacteristicsBuilder -> {
                multiInstanceLoopCharacteristicsBuilder.zeebeInputCollectionExpression("[1,2,3]");
            }).embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeJobType("A");
            }).boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        };
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess-1", subProcessBuilder2 -> {
            subProcessBuilder2.embeddedSubProcess().startEvent().subProcess("subprocess-2", consumer).serviceTask("B", serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeJobType("B");
            }).endEvent();
        }).parallelGateway("join").moveToNode("fork").serviceTask("C", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("C");
        }).intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().compensateEventDefinitionDone().connectTo("join").endEvent().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        completeJobs(create, "A", 3);
        ENGINE.job().ofInstance(create).withType("C").complete();
        ENGINE.job().ofInstance(create).withType("B").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).doesNotContain(new Tuple[]{Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerHandlersForSubprocessIfParentMultiInstancesSubprocessesAreNotCompleted() {
        Consumer consumer = subProcessBuilder -> {
            subProcessBuilder.embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeJobType("A");
            }).boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        };
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess", subProcessBuilder2 -> {
            subProcessBuilder2.multiInstance(multiInstanceLoopCharacteristicsBuilder -> {
                multiInstanceLoopCharacteristicsBuilder.zeebeInputCollectionExpression("[1,2,3]");
            }).embeddedSubProcess().startEvent().subProcess("subprocess-2", consumer).serviceTask("B", serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeJobType("B");
            });
        }).parallelGateway("join").moveToNode("fork").serviceTask("C", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("C");
        }).intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().compensateEventDefinitionDone().connectTo("join").endEvent().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        completeJobs(create, "A", 3);
        List list = RecordingExporter.jobRecords(JobIntent.CREATED).withProcessInstanceKey(create).withType("B").limit(3L).map((v0) -> {
            return v0.getKey();
        }).toList();
        ENGINE.job().withKey(((Long) list.getFirst()).longValue()).complete();
        ENGINE.job().ofInstance(create).withType("C").complete();
        list.stream().skip(1L).forEach(l -> {
            ENGINE.job().withKey(l.longValue()).complete();
        });
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).doesNotContain(new Tuple[]{Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldCompleteThrowEventThatTriggeredCompensationHandler() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().subProcess("subprocess", subProcessBuilder -> {
            subProcessBuilder.multiInstance(multiInstanceLoopCharacteristicsBuilder -> {
                multiInstanceLoopCharacteristicsBuilder.zeebeInputCollectionExpression("[1,2,3]");
            }).embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                    boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
                });
            }).intermediateThrowEvent("compensation-throw-event", (v0) -> {
                v0.compensateEventDefinition();
            }).endEvent().zeebeOutputExpression("loopCounter", "completed");
        }).endEvent().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        completeJobs(create, "A", 3);
        List list = RecordingExporter.jobRecords(JobIntent.CREATED).withProcessInstanceKey(create).withType("Undo-A").limit(3L).map((v0) -> {
            return v0.getKey();
        }).toList();
        ENGINE.job().withKey(((Long) list.get(1)).longValue()).complete();
        ENGINE.job().withKey(((Long) list.get(2)).longValue()).complete();
        ENGINE.job().withKey(((Long) list.get(0)).longValue()).complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})});
        Assertions.assertThat(RecordingExporter.records().limitToProcessInstance(create).variableRecords()).extracting((v0) -> {
            return v0.getValue();
        }).extracting(new Function[]{(v0) -> {
            return v0.getName();
        }, (v0) -> {
            return v0.getValue();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"loopCounter", "1"}), Assertions.tuple(new Object[]{"loopCounter", "2"}), Assertions.tuple(new Object[]{"loopCounter", "3"}), Assertions.tuple(new Object[]{"completed", "2"}), Assertions.tuple(new Object[]{"completed", "3"}), Assertions.tuple(new Object[]{"completed", "1"})});
    }

    @Test
    public void shouldApplyInputMappingsOfCompensationHandler() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A").zeebeInputExpression("x + 1", "y");
            });
        }).endEvent().compensateEventDefinition().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("x", 1).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("Undo-A").complete();
        Assertions.assertThat(RecordingExporter.records().limitToProcessInstance(create)).extracting(new Function[]{(v0) -> {
            return v0.getValueType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.TRIGGERED}), Assertions.tuple(new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple(new Object[]{ValueType.VARIABLE, VariableIntent.CREATED}), Assertions.tuple(new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATED})});
        io.camunda.zeebe.protocol.record.Assertions.assertThat(((Record) RecordingExporter.variableRecords(VariableIntent.CREATED).withProcessInstanceKey(create).withName("y").getFirst()).getValue()).hasScopeKey(((Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(create).withElementId("Undo-A").getFirst()).getKey()).hasValue("2");
    }

    @Test
    public void shouldApplyOutputMappingsOfCompensationHandler() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A").zeebeOutputExpression("x + 1", "y");
            });
        }).endEvent().compensateEventDefinition().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("Undo-A").withVariable("x", 1).complete();
        Assertions.assertThat(RecordingExporter.records().limitToProcessInstance(create)).extracting(new Function[]{(v0) -> {
            return v0.getValueType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.TRIGGERED}), Assertions.tuple(new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple(new Object[]{ValueType.VARIABLE, VariableIntent.CREATED}), Assertions.tuple(new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.COMPLETED})});
        io.camunda.zeebe.protocol.record.Assertions.assertThat(((Record) RecordingExporter.variableRecords(VariableIntent.CREATED).withProcessInstanceKey(create).withName("y").getFirst()).getValue()).hasScopeKey(create).hasValue("2");
    }

    @Test
    public void shouldPropagateVariablesOfCompensationHandler() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().subProcess("subprocess").zeebeInputExpression("0", "local").embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).endEvent().compensateEventDefinition().subProcessDone().endEvent().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("Undo-A").withVariables(Map.ofEntries(Map.entry("local", 1), Map.entry("global", 2))).complete();
        long key = ((Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(create).withElementId("subprocess").getFirst()).getKey();
        Assertions.assertThat(RecordingExporter.records().limitToProcessInstance(create).variableRecords()).extracting((v0) -> {
            return v0.getValue();
        }).extracting(new Function[]{(v0) -> {
            return v0.getScopeKey();
        }, (v0) -> {
            return v0.getName();
        }, (v0) -> {
            return v0.getValue();
        }}).containsExactly(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(key), "local", "0"}), Assertions.tuple(new Object[]{Long.valueOf(key), "local", "1"}), Assertions.tuple(new Object[]{Long.valueOf(create), "global", "2"})});
    }

    @Test
    public void shouldTriggerCompensationForMultiInstanceActivityOnlyOnce() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-multi-instance-activity.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        completeJobs(create, SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY, 3);
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(3L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record -> {
            return record.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record2 -> {
            return record2.getValue().getBpmnElementType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldTriggerCompensationAfterAllMultiInstanceActivitiesAreCompleted() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-multi-instance-activity-parallel.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        completeJobs(create, SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY, 3);
        completeJobs(create, "activity", 1);
        ENGINE.job().ofInstance(create).withType(SERVICE_TASK_TYPE_COMPENSATION_HANDLER).complete();
        Assertions.assertThat(RecordingExporter.compensationSubscriptionRecords().withProcessInstanceKey(create).limit(3L)).extracting(new Function[]{(v0) -> {
            return v0.getIntent();
        }, record -> {
            return record.getValue().getCompensationHandlerId();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{CompensationSubscriptionIntent.CREATED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.TRIGGERED, "CompensationHandler"}), Assertions.tuple(new Object[]{CompensationSubscriptionIntent.COMPLETED, "CompensationHandler"})});
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record2 -> {
            return record2.getValue().getBpmnElementType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerCompensationIfMultiInstanceActivitiesAreNotCompleted() {
        ENGINE.deployment().withXmlResource(createModelFromClasspathResource("/compensation/compensation-multi-instance-activity-parallel.bpmn")).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        List list = RecordingExporter.jobRecords(JobIntent.CREATED).withProcessInstanceKey(create).withType(SERVICE_TASK_TYPE_COMPENSABLE_ACTIVITY).limit(3L).map((v0) -> {
            return v0.getKey();
        }).toList();
        ENGINE.job().withKey(((Long) list.getFirst()).longValue()).complete();
        completeJobs(create, "activity", 1);
        list.stream().skip(1L).forEach(l -> {
            ENGINE.job().withKey(l.longValue()).complete();
        });
        Assertions.assertThat(RecordingExporter.records().limitToProcessInstance(create)).extracting(new Function[]{(v0) -> {
            return v0.getValueType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.CREATED}), Assertions.tuple(new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.DELETED}), Assertions.tuple(new Object[]{ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETED})}).doesNotContain(new Tuple[]{Assertions.tuple(new Object[]{ValueType.COMPENSATION_SUBSCRIPTION, CompensationSubscriptionIntent.TRIGGERED})});
    }

    @Test
    public void shouldTriggerCompensationForActivityOnIntermediateThrowEvent() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().parallelGateway("fork").serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).parallelGateway("join").moveToNode("fork").serviceTask("B", serviceTaskBuilder2 -> {
            serviceTaskBuilder2.zeebeJobType("B").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-B").zeebeJobType("Undo-B");
            });
        }).connectTo("join").intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().activityRef("A").done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("B").complete();
        ENGINE.job().ofInstance(create).withType("Undo-A").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).doesNotContain(new Tuple[]{Assertions.tuple(new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForActivityOnEndEvent() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().parallelGateway("fork").serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).parallelGateway("join").moveToNode("fork").serviceTask("B", serviceTaskBuilder2 -> {
            serviceTaskBuilder2.zeebeJobType("B").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-B").zeebeJobType("Undo-B");
            });
        }).connectTo("join").endEvent("compensation-throw-event").compensateEventDefinition().activityRef("A").done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("B").complete();
        ENGINE.job().ofInstance(create).withType("Undo-A").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).doesNotContain(new Tuple[]{Assertions.tuple(new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForActivityTheSameAmountAsExecuted() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().zeebeOutputExpression("0", "iteration").serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").zeebeOutputExpression("iteration + 1", "iteration").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).exclusiveGateway("loop").defaultFlow().connectTo("A").moveToNode("loop").conditionExpression("iteration > 1").intermediateThrowEvent("compensation-throw-event", intermediateThrowEventBuilder -> {
            intermediateThrowEventBuilder.compensateEventDefinition().activityRef("A");
        }).endEvent().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        completeJobs(create, "A", 2);
        completeJobs(create, "Undo-A", 2);
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldNotTriggerCompensationForActivityAgain() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).parallelGateway("fork").intermediateThrowEvent("compensation-throw-event-1", intermediateThrowEventBuilder -> {
            intermediateThrowEventBuilder.compensateEventDefinition().activityRef("A");
        }).moveToNode("fork").intermediateThrowEvent("compensation-throw-event-2", intermediateThrowEventBuilder2 -> {
            intermediateThrowEventBuilder2.compensateEventDefinition().activityRef("A");
        }).done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("Undo-A").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event-2", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event-1", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event-1", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event-2", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).containsOnlyOnce(new Tuple[]{Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerCompensationForActivityIfActive() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().parallelGateway("fork").serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).moveToNode("fork").serviceTask("B", serviceTaskBuilder2 -> {
            serviceTaskBuilder2.zeebeJobType("B");
        }).intermediateThrowEvent("compensation-throw-event", intermediateThrowEventBuilder -> {
            intermediateThrowEventBuilder.compensateEventDefinition().activityRef("A");
        }).done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("B").complete();
        ENGINE.job().ofInstance(create).withType("A").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).doesNotContain(new Tuple[]{Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForSubprocess() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess").embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).subProcessDone().parallelGateway("join").moveToNode("fork").serviceTask("B", serviceTaskBuilder2 -> {
            serviceTaskBuilder2.zeebeJobType("B").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-B").zeebeJobType("Undo-B");
            });
        }).connectTo("join").intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().activityRef("subprocess").done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("B").complete();
        ENGINE.job().ofInstance(create).withType("Undo-A").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).doesNotContain(new Tuple[]{Assertions.tuple(new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldTriggerCompensationForMultiInstanceSubprocess() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess").multiInstance(multiInstanceLoopCharacteristicsBuilder -> {
            multiInstanceLoopCharacteristicsBuilder.zeebeInputCollectionExpression("[1,2,3]");
        }).embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).subProcessDone().parallelGateway("join").moveToNode("fork").serviceTask("B", serviceTaskBuilder2 -> {
            serviceTaskBuilder2.zeebeJobType("B").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-B").zeebeJobType("Undo-B");
            });
        }).connectTo("join").intermediateThrowEvent("compensation-throw-event").compensateEventDefinition().activityRef("subprocess").done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        completeJobs(create, "A", 3);
        ENGINE.job().ofInstance(create).withType("B").complete();
        completeJobs(create, "Undo-A", 3);
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).doesNotContain(new Tuple[]{Assertions.tuple(new Object[]{"Undo-B", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerCompensationForSubprocessAgain() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().subProcess("subprocess").embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).subProcessDone().parallelGateway("fork").intermediateThrowEvent("compensation-throw-event-1", intermediateThrowEventBuilder -> {
            intermediateThrowEventBuilder.compensateEventDefinition().activityRef("subprocess");
        }).moveToNode("fork").intermediateThrowEvent("compensation-throw-event-2", intermediateThrowEventBuilder2 -> {
            intermediateThrowEventBuilder2.compensateEventDefinition().activityRef("subprocess");
        }).done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("Undo-A").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event-2", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event-1", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event-1", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event-2", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).containsOnlyOnce(new Tuple[]{Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldNotTriggerCompensationForSubprocessIfActive() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().parallelGateway("fork").subProcess("subprocess").embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).subProcessDone().moveToNode("fork").serviceTask("B", serviceTaskBuilder2 -> {
            serviceTaskBuilder2.zeebeJobType("B");
        }).intermediateThrowEvent("compensation-throw-event", intermediateThrowEventBuilder -> {
            intermediateThrowEventBuilder.compensateEventDefinition().activityRef("subprocess");
        }).done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("B").complete();
        ENGINE.job().ofInstance(create).withType("A").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})}).doesNotContain(new Tuple[]{Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldInvokeSubprocessCompensationHandler() {
        Consumer consumer = boundaryEventBuilder -> {
            boundaryEventBuilder.subProcess().embeddedSubProcess().startEvent().serviceTask("B", serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeJobType("B");
            }).serviceTask("C", serviceTaskBuilder2 -> {
                serviceTaskBuilder2.zeebeJobType("C");
            }).endEvent();
        };
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().serviceTask("A", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("A").boundaryEvent().compensation(consumer);
        }).endEvent().compensateEventDefinition().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("B").complete();
        ENGINE.job().ofInstance(create).withType("C").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{BpmnElementType.SUB_PROCESS, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{BpmnElementType.SUB_PROCESS, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCompensateSubprocess() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().subProcess("subprocess", subProcessBuilder -> {
            subProcessBuilder.embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeJobType("A");
            }).endEvent();
        }).boundaryEvent().compensation(boundaryEventBuilder -> {
            boundaryEventBuilder.serviceTask("B").zeebeJobType("B");
        }).moveToActivity("subprocess").endEvent().compensateEventDefinition().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("B").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getBpmnElementType();
        }, record2 -> {
            return record2.getValue().getBpmnEventType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{BpmnElementType.SUB_PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{BpmnElementType.SERVICE_TASK, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{BpmnElementType.END_EVENT, BpmnEventType.COMPENSATION, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{BpmnElementType.PROCESS, BpmnEventType.UNSPECIFIED, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCompensateSubprocessWithInnerCompensationHandler() {
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess(PROCESS_ID).startEvent().subProcess("subprocess", subProcessBuilder -> {
            subProcessBuilder.embeddedSubProcess().startEvent().serviceTask("A", serviceTaskBuilder -> {
                serviceTaskBuilder.zeebeJobType("A");
            }).boundaryEvent().compensation(boundaryEventBuilder -> {
                boundaryEventBuilder.serviceTask("Undo-A").zeebeJobType("Undo-A");
            });
        }).boundaryEvent().compensation(boundaryEventBuilder -> {
            boundaryEventBuilder.serviceTask("B").zeebeJobType("B");
        }).moveToActivity("subprocess").endEvent("compensation-throw-event").compensateEventDefinition().done()).deploy();
        long create = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(create).withType("A").complete();
        ENGINE.job().ofInstance(create).withType("Undo-A").complete();
        ENGINE.job().ofInstance(create).withType("B").complete();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).limitToProcessInstanceCompleted()).extracting(new Function[]{record -> {
            return record.getValue().getElementId();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{"subprocess", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"Undo-A", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"B", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{"compensation-throw-event", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple(new Object[]{PROCESS_ID, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    private BpmnModelInstance createModelFromClasspathResource(String str) {
        return Bpmn.readModelFromStream(getClass().getResourceAsStream(str));
    }

    private void completeJobs(long j, String str, int i) {
        RecordingExporter.jobRecords(JobIntent.CREATED).withProcessInstanceKey(j).withType(str).limit(i).map((v0) -> {
            return v0.getKey();
        }).forEach(l -> {
            ENGINE.job().withKey(l.longValue()).complete();
        });
    }
}
