package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.engine.Loggers;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.Task;
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.TaskResultBuilder;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.test.util.junit.RegressionTest;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import org.agrona.LangUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

/* loaded from: input_file:io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceTest.class */
public class ProcessingScheduleServiceTest {
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);
    private static final ProcessInstanceRecord RECORD = Records.processInstance(1);
    private ControlledActorClock clock;
    private ActorScheduler actorScheduler;
    private LifecycleSupplier lifecycleSupplier;
    private WriterAsyncSupplier writerAsyncSupplier;
    private TestScheduleServiceActorDecorator scheduleService;

    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceTest$DummyTask.class */
    private static final class DummyTask implements Task {
        private DummyTask() {
        }

        public TaskResult execute(TaskResultBuilder taskResultBuilder) {
            return RecordBatch::empty;
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceTest$LifecycleSupplier.class */
    private static final class LifecycleSupplier implements Supplier<StreamProcessor.Phase>, BooleanSupplier {
        volatile StreamProcessor.Phase currentPhase = StreamProcessor.Phase.PROCESSING;
        volatile boolean isAborted = false;

        private LifecycleSupplier() {
        }

        @Override // java.util.function.BooleanSupplier
        public boolean getAsBoolean() {
            return this.isAborted;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public StreamProcessor.Phase get() {
            return this.currentPhase;
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceTest$TestScheduleServiceActorDecorator.class */
    private static final class TestScheduleServiceActorDecorator extends Actor implements ProcessingScheduleService {
        private final ProcessingScheduleServiceImpl processingScheduleService;

        public TestScheduleServiceActorDecorator(ProcessingScheduleServiceImpl processingScheduleServiceImpl) {
            this.processingScheduleService = processingScheduleServiceImpl;
        }

        public void runDelayed(Duration duration, Runnable runnable) {
            this.actor.submit(() -> {
                this.processingScheduleService.runDelayed(duration, runnable);
            });
        }

        public void runDelayed(Duration duration, Task task) {
            this.actor.submit(() -> {
                this.processingScheduleService.runDelayed(duration, task);
            });
        }

        public void runAtFixedRate(Duration duration, Task task) {
            this.actor.submit(() -> {
                this.processingScheduleService.runAtFixedRate(duration, task);
            });
        }

        protected void onActorStarting() {
            this.actor.runOnCompletionBlockingCurrentPhase(this.processingScheduleService.open(this.actor), (r4, th) -> {
                if (th != null) {
                    this.actor.fail(th);
                }
            });
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceTest$WriterAsyncSupplier.class */
    private static final class WriterAsyncSupplier implements Supplier<ActorFuture<LogStreamBatchWriter>> {
        AtomicReference<ActorFuture<LogStreamBatchWriter>> writerFutureRef = new AtomicReference<>(CompletableActorFuture.completed((LogStreamBatchWriter) Mockito.mock(LogStreamBatchWriter.class)));

        private WriterAsyncSupplier() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public ActorFuture<LogStreamBatchWriter> get() {
            return this.writerFutureRef.get();
        }
    }

    @BeforeEach
    public void before() {
        this.clock = new ControlledActorClock();
        this.actorScheduler = ActorScheduler.newActorScheduler().setCpuBoundActorThreadCount(Math.max(1, Runtime.getRuntime().availableProcessors() - 2)).setIoBoundActorThreadCount(2).setActorClock(this.clock).build();
        this.actorScheduler.start();
        this.lifecycleSupplier = new LifecycleSupplier();
        this.writerAsyncSupplier = new WriterAsyncSupplier();
        this.scheduleService = new TestScheduleServiceActorDecorator(new ProcessingScheduleServiceImpl(this.lifecycleSupplier, this.lifecycleSupplier, this.writerAsyncSupplier));
        this.actorScheduler.submitActor(this.scheduleService);
    }

    @AfterEach
    public void clean() {
        try {
            this.actorScheduler.close();
        } catch (Exception e) {
            LangUtil.rethrowUnchecked(e);
        }
        this.actorScheduler = null;
    }

    @Test
    public void shouldExecuteScheduledTask() {
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        ((DummyTask) Mockito.verify(dummyTask, TIMEOUT)).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldExecuteScheduledTaskInRightOrder() {
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        DummyTask dummyTask2 = (DummyTask) Mockito.spy(new DummyTask());
        this.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        this.scheduleService.runDelayed(Duration.ZERO, dummyTask2);
        InOrder inOrder = Mockito.inOrder(new Object[]{dummyTask, dummyTask2});
        ((DummyTask) inOrder.verify(dummyTask, TIMEOUT)).execute((TaskResultBuilder) ArgumentMatchers.any());
        ((DummyTask) inOrder.verify(dummyTask2, TIMEOUT)).execute((TaskResultBuilder) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotExecuteScheduledTaskIfNotInProcessingPhase() {
        this.lifecycleSupplier.currentPhase = StreamProcessor.Phase.INITIAL;
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        ((DummyTask) Mockito.verify(dummyTask, Mockito.never())).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotExecuteScheduledTaskIfAborted() {
        this.lifecycleSupplier.isAborted = true;
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        ((DummyTask) Mockito.verify(dummyTask, Mockito.never())).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldExecuteScheduledTaskInProcessing() {
        this.lifecycleSupplier.currentPhase = StreamProcessor.Phase.PAUSED;
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.scheduleService.runDelayed(Duration.ZERO, dummyTask);
        ((DummyTask) Mockito.verify(dummyTask, Mockito.never())).execute((TaskResultBuilder) ArgumentMatchers.any());
        this.lifecycleSupplier.currentPhase = StreamProcessor.Phase.PROCESSING;
        ((DummyTask) Mockito.verify(dummyTask, TIMEOUT)).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotExecuteTasksWhenScheduledOnClosedActor() {
        this.lifecycleSupplier.currentPhase = StreamProcessor.Phase.PAUSED;
        ProcessingScheduleServiceImpl processingScheduleServiceImpl = new ProcessingScheduleServiceImpl(this.lifecycleSupplier, this.lifecycleSupplier, this.writerAsyncSupplier);
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        processingScheduleServiceImpl.runDelayed(Duration.ZERO, dummyTask);
        ((DummyTask) Mockito.verify(dummyTask, Mockito.never())).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldFailActorIfWriterCantBeRetrieved() {
        this.writerAsyncSupplier.writerFutureRef.set(CompletableActorFuture.completedExceptionally(new RuntimeException("expected")));
        ActorFuture submitActor = this.actorScheduler.submitActor(new TestScheduleServiceActorDecorator(new ProcessingScheduleServiceImpl(this.lifecycleSupplier, this.lifecycleSupplier, this.writerAsyncSupplier)));
        Objects.requireNonNull(submitActor);
        Assertions.assertThatThrownBy(submitActor::join).hasMessageContaining("expected");
    }

    @Test
    public void shouldWriteRecordAfterTaskWasExecuted() {
        LogStreamBatchWriter logStreamBatchWriter = (LogStreamBatchWriter) this.writerAsyncSupplier.get().join();
        Mockito.when(Boolean.valueOf(logStreamBatchWriter.canWriteAdditionalEvent(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()))).thenReturn(true);
        LogStreamBatchWriter.LogEntryBuilder logEntryBuilder = (LogStreamBatchWriter.LogEntryBuilder) Mockito.mock(LogStreamBatchWriter.LogEntryBuilder.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when(logStreamBatchWriter.event()).thenReturn(logEntryBuilder);
        this.scheduleService.runDelayed(Duration.ZERO, taskResultBuilder -> {
            taskResultBuilder.appendCommandRecord(1L, ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
            return taskResultBuilder.build();
        });
        ((LogStreamBatchWriter) Mockito.verify(logStreamBatchWriter, TIMEOUT)).event();
        ((LogStreamBatchWriter.LogEntryBuilder) Mockito.verify(logEntryBuilder, TIMEOUT)).key(1L);
        ((LogStreamBatchWriter) Mockito.verify(logStreamBatchWriter, TIMEOUT)).tryWrite();
    }

    @RegressionTest("https://github.com/camunda/zeebe/issues/10240")
    public void shouldPreserveOrderingOfWritesEvenWithRetries() {
        LogStreamBatchWriter logStreamBatchWriter = (LogStreamBatchWriter) this.writerAsyncSupplier.get().join();
        Mockito.when(Boolean.valueOf(logStreamBatchWriter.canWriteAdditionalEvent(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()))).thenReturn(true);
        LogStreamBatchWriter.LogEntryBuilder logEntryBuilder = (LogStreamBatchWriter.LogEntryBuilder) Mockito.mock(LogStreamBatchWriter.LogEntryBuilder.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when(logStreamBatchWriter.event()).thenReturn(logEntryBuilder);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when(Long.valueOf(logStreamBatchWriter.tryWrite())).then(invocationOnMock -> {
            if (atomicInteger.incrementAndGet() < 5000) {
                return -1L;
            }
            Loggers.PROCESS_PROCESSOR_LOGGER.debug("End tryWrite loop");
            return 0L;
        });
        this.scheduleService.runDelayed(Duration.ofMinutes(1L), taskResultBuilder -> {
            Loggers.PROCESS_PROCESSOR_LOGGER.debug("Running second timer");
            taskResultBuilder.appendCommandRecord(2L, ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
            return taskResultBuilder.build();
        });
        this.scheduleService.runDelayed(Duration.ZERO, taskResultBuilder2 -> {
            Loggers.PROCESS_PROCESSOR_LOGGER.debug("Running first timer");
            this.clock.addTime(Duration.ofMinutes(1L));
            taskResultBuilder2.appendCommandRecord(1L, ProcessInstanceIntent.ACTIVATE_ELEMENT, RECORD);
            return taskResultBuilder2.build();
        });
        InOrder inOrder = Mockito.inOrder(new Object[]{logStreamBatchWriter, logEntryBuilder});
        ((LogStreamBatchWriter) inOrder.verify(logStreamBatchWriter, TIMEOUT)).event();
        ((LogStreamBatchWriter.LogEntryBuilder) inOrder.verify(logEntryBuilder, TIMEOUT)).key(1L);
        ((LogStreamBatchWriter) inOrder.verify(logStreamBatchWriter, TIMEOUT.times(5000))).tryWrite();
        ((LogStreamBatchWriter) inOrder.verify(logStreamBatchWriter, TIMEOUT)).event();
        ((LogStreamBatchWriter.LogEntryBuilder) inOrder.verify(logEntryBuilder, TIMEOUT)).key(2L);
        ((LogStreamBatchWriter) inOrder.verify(logStreamBatchWriter, TIMEOUT)).tryWrite();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldScheduleOnFixedRate() {
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.scheduleService.runAtFixedRate(Duration.ofMillis(10L), dummyTask);
        ((DummyTask) Mockito.verify(dummyTask, TIMEOUT.atLeast(5))).execute((TaskResultBuilder) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotRunScheduledTasksAfterClosed() {
        DummyTask dummyTask = (DummyTask) Mockito.spy(new DummyTask());
        this.scheduleService.runDelayed(Duration.ofMillis(200L), dummyTask);
        this.scheduleService.close();
        ((DummyTask) Mockito.verify(dummyTask, Mockito.never())).execute((TaskResultBuilder) ArgumentMatchers.any());
    }
}
