package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.api.EmptyProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.Task;
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.TaskResultBuilder;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamPlatform;
import io.camunda.zeebe.engine.util.StreamPlatformExtension;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

@ExtendWith({StreamPlatformExtension.class})
/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/ProcessingScheduleServiceIntegrationTest.class */
public class ProcessingScheduleServiceIntegrationTest {
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);
    private static final ProcessInstanceRecord RECORD = Records.processInstance(1);
    private StreamPlatform streamPlatform;
    private ControlledActorClock clock;
    private DummyProcessor dummyProcessor;

    /* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/ProcessingScheduleServiceIntegrationTest$DummyProcessor.class */
    private static final class DummyProcessor implements RecordProcessor {
        private ProcessingScheduleService scheduleService;
        private CountDownLatch replayLatch;

        private DummyProcessor() {
        }

        public void init(RecordProcessorContext recordProcessorContext) {
            this.scheduleService = recordProcessorContext.getScheduleService();
        }

        public boolean accepts(ValueType valueType) {
            return true;
        }

        public void replay(TypedRecord typedRecord) {
            if (this.replayLatch != null) {
                try {
                    this.replayLatch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        public ProcessingResult process(TypedRecord typedRecord, ProcessingResultBuilder processingResultBuilder) {
            return EmptyProcessingResult.INSTANCE;
        }

        public ProcessingResult onProcessingError(Throwable th, TypedRecord typedRecord, ProcessingResultBuilder processingResultBuilder) {
            return EmptyProcessingResult.INSTANCE;
        }

        public void blockReplay() {
            this.replayLatch = new CountDownLatch(1);
        }

        public void continueReplay() {
            if (this.replayLatch != null) {
                this.replayLatch.countDown();
            }
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/ProcessingScheduleServiceIntegrationTest$DummyTask.class */
    private static final class DummyTask implements Task {
        private DummyTask() {
        }

        public TaskResult execute(TaskResultBuilder taskResultBuilder) {
            return RecordBatch::empty;
        }
    }

    @BeforeEach
    public void before() {
        this.dummyProcessor = new DummyProcessor();
    }

    @AfterEach
    public void clean() {
        this.dummyProcessor.continueReplay();
        this.streamPlatform = null;
    }

    @Test
    public void shouldExecuteScheduledTask() {
        this.streamPlatform.withRecordProcessors(List.of(this.dummyProcessor)).startStreamProcessor();
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.dummyProcessor.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        ((DummyTask) Mockito.verify(dummyTask, TIMEOUT)).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotExecuteScheduledTaskIfOnReplay() {
        this.dummyProcessor.blockReplay();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        this.streamPlatform.withRecordProcessors(List.of(this.dummyProcessor)).startStreamProcessorNotAwaitOpening();
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.dummyProcessor.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        ((DummyTask) Mockito.verify(dummyTask, Mockito.never())).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotExecuteTaskWhichAreScheduledDuringReplay() {
        DummyProcessor dummyProcessor = (DummyProcessor) Mockito.spy(this.dummyProcessor);
        dummyProcessor.blockReplay();
        this.streamPlatform.writeBatch(RecordToWrite.command().processInstance(ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD), RecordToWrite.event().processInstance(ProcessInstanceIntent.ELEMENT_ACTIVATING, RECORD).causedBy(0));
        this.streamPlatform.withRecordProcessors(List.of(dummyProcessor)).startStreamProcessorNotAwaitOpening();
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        dummyProcessor.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        dummyProcessor.continueReplay();
        ((DummyProcessor) Mockito.verify(dummyProcessor, TIMEOUT)).init((RecordProcessorContext) ArgumentMatchers.any());
        ((DummyProcessor) Mockito.verify(dummyProcessor, TIMEOUT)).replay((TypedRecord) ArgumentMatchers.any());
        ((DummyTask) Mockito.verify(dummyTask, Mockito.never())).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotExecuteScheduledTaskIfSuspended() {
        this.streamPlatform.withRecordProcessors(List.of(this.dummyProcessor)).startStreamProcessor();
        this.streamPlatform.pauseProcessing();
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.dummyProcessor.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        ((DummyTask) Mockito.verify(dummyTask, Mockito.never())).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldExecuteScheduledTaskAfterResumed() {
        this.streamPlatform.withRecordProcessors(List.of(this.dummyProcessor)).startStreamProcessor();
        this.streamPlatform.pauseProcessing();
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.dummyProcessor.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        this.streamPlatform.resumeProcessing();
        ((DummyTask) Mockito.verify(dummyTask, TIMEOUT)).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldWriteRecordAfterTaskWasExecuted() {
        DummyProcessor dummyProcessor = (DummyProcessor) Mockito.spy(this.dummyProcessor);
        this.streamPlatform.withRecordProcessors(List.of(dummyProcessor)).startStreamProcessor();
        dummyProcessor.scheduleService.runDelayed(Duration.ZERO, taskResultBuilder -> {
            taskResultBuilder.appendCommandRecord(1L, ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
            return taskResultBuilder.build();
        });
        ((DummyProcessor) Mockito.verify(dummyProcessor, TIMEOUT)).process((TypedRecord) Mockito.argThat(typedRecord -> {
            return typedRecord.getKey() == 1;
        }), (ProcessingResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldScheduleOnFixedRate() {
        DummyProcessor dummyProcessor = (DummyProcessor) Mockito.spy(this.dummyProcessor);
        this.streamPlatform.withRecordProcessors(List.of(dummyProcessor)).startStreamProcessor();
        dummyProcessor.scheduleService.runAtFixedRate(Duration.ofMillis(100L), taskResultBuilder -> {
            taskResultBuilder.appendCommandRecord(1L, ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
            return taskResultBuilder.build();
        });
        ((DummyProcessor) Mockito.verify(dummyProcessor, TIMEOUT.times(5))).process((TypedRecord) Mockito.argThat(typedRecord -> {
            return typedRecord.getKey() == 1;
        }), (ProcessingResultBuilder) ArgumentMatchers.any());
    }
}
