package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.Task;
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.TaskResultBuilder;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import java.time.Duration;
import org.agrona.collections.MutableInteger;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/MessageTimeToLiveChecker.class */
public final class MessageTimeToLiveChecker implements Task {
    private final Duration executionInterval;
    private final int batchLimit;
    private final boolean enableMessageTtlCheckerAsync;
    private final ProcessingScheduleService scheduleService;
    private final MessageState messageState;
    private final MessageRecord emptyDeleteMessageCommand = new MessageRecord().setName("").setCorrelationKey("").setTimeToLive(-1);
    private long currentTimestamp = -1;
    private MessageState.Index lastIndex = null;

    public MessageTimeToLiveChecker(Duration duration, int i, boolean z, ProcessingScheduleService processingScheduleService, MessageState messageState) {
        this.executionInterval = duration;
        this.batchLimit = i;
        this.enableMessageTtlCheckerAsync = z;
        this.messageState = messageState;
        this.scheduleService = processingScheduleService;
    }

    @Override // io.camunda.zeebe.engine.api.Task
    public TaskResult execute(TaskResultBuilder taskResultBuilder) {
        if (this.currentTimestamp == -1) {
            this.currentTimestamp = ActorClock.currentTimeMillis();
        }
        MutableInteger mutableInteger = new MutableInteger(0);
        if (this.messageState.visitMessagesWithDeadlineBeforeTimestamp(this.currentTimestamp, this.lastIndex, (j, j2) -> {
            MessageState.Index index = new MessageState.Index(j2, j);
            boolean equals = index.equals(this.lastIndex);
            this.lastIndex = index;
            if (equals) {
                return true;
            }
            return taskResultBuilder.appendCommandRecord(j2, MessageIntent.EXPIRE, this.emptyDeleteMessageCommand) && mutableInteger.incrementAndGet() < this.batchLimit;
        })) {
            reschedule(Duration.ZERO);
        } else {
            this.lastIndex = null;
            this.currentTimestamp = -1L;
            reschedule(this.executionInterval);
        }
        return taskResultBuilder.build();
    }

    private void reschedule(Duration duration) {
        if (this.enableMessageTtlCheckerAsync) {
            this.scheduleService.runDelayedAsync(duration, this);
        } else {
            this.scheduleService.runDelayed(duration, this);
        }
    }
}
