package io.camunda.zeebe.logstreams.impl.log;

import com.netflix.concurrency.limits.limit.AbstractLimit;
import com.netflix.concurrency.limits.limit.WindowedLimit;
import io.camunda.zeebe.dispatcher.BlockPeek;
import io.camunda.zeebe.dispatcher.Subscription;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.backpressure.AlgorithmCfg;
import io.camunda.zeebe.logstreams.impl.backpressure.AppendBackpressureMetrics;
import io.camunda.zeebe.logstreams.impl.backpressure.AppendEntryLimiter;
import io.camunda.zeebe.logstreams.impl.backpressure.AppendLimiter;
import io.camunda.zeebe.logstreams.impl.backpressure.AppenderGradient2Cfg;
import io.camunda.zeebe.logstreams.impl.backpressure.AppenderVegasCfg;
import io.camunda.zeebe.logstreams.impl.backpressure.BackpressureConstants;
import io.camunda.zeebe.logstreams.impl.backpressure.NoopAppendLimiter;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.util.Environment;
import io.camunda.zeebe.util.collection.Tuple;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthStatus;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import io.prometheus.client.Histogram;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/log/LogStorageAppender.class */
public class LogStorageAppender extends Actor implements HealthMonitorable {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private static final Map<String, AlgorithmCfg> ALGORITHM_CFG = Map.of("vegas", new AppenderVegasCfg(), "gradient2", new AppenderGradient2Cfg());
    private final String name;
    private final Subscription writeBufferSubscription;
    private final int maxAppendBlockSize;
    private final LogStorage logStorage;
    private final AppendLimiter appendEntryLimiter;
    private final AppendBackpressureMetrics appendBackpressureMetrics;
    private final AppenderMetrics appenderMetrics;
    private final ActorFuture<Void> closeFuture;
    private final int partitionId;
    private final LoggedEventImpl positionReader = new LoggedEventImpl();
    private final Set<FailureListener> failureListeners = new HashSet();
    private final Environment env = new Environment();

    public LogStorageAppender(String str, int i, LogStorage logStorage, Subscription subscription, int i2) {
        this.appenderMetrics = new AppenderMetrics(Integer.toString(i));
        this.name = str;
        this.partitionId = i;
        this.logStorage = logStorage;
        this.writeBufferSubscription = subscription;
        this.maxAppendBlockSize = i2;
        this.appendBackpressureMetrics = new AppendBackpressureMetrics(i);
        this.appendEntryLimiter = ((Boolean) this.env.getBool(BackpressureConstants.ENV_BP_APPENDER).orElse(true)).booleanValue() ? initBackpressure(i) : initNoBackpressure(i);
        this.closeFuture = new CompletableActorFuture();
    }

    private AppendLimiter initBackpressure(int i) {
        AlgorithmCfg orDefault = ALGORITHM_CFG.getOrDefault(((String) this.env.get(BackpressureConstants.ENV_BP_APPENDER_ALGORITHM).orElse("vegas")).toLowerCase(), new AppenderVegasCfg());
        orDefault.applyEnvironment(this.env);
        WindowedLimit windowedLimit = (AbstractLimit) orDefault.get();
        boolean booleanValue = ((Boolean) this.env.getBool(BackpressureConstants.ENV_BP_APPENDER_WINDOWED).orElse(false)).booleanValue();
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = Integer.valueOf(i);
        objArr[1] = orDefault;
        objArr[2] = booleanValue ? "enabled" : "disabled";
        logger.debug("Configured log appender back pressure at partition {} as {}. Window limiting is {}", objArr);
        return ((AppendEntryLimiter.AppendEntryLimiterBuilder) AppendEntryLimiter.builder().limit(booleanValue ? WindowedLimit.newBuilder().build(windowedLimit) : windowedLimit)).partitionId(i).build();
    }

    private AppendLimiter initNoBackpressure(int i) {
        LOG.warn("No back pressure for the log appender (partition = {}) configured! This might cause problems.", Integer.valueOf(i));
        return new NoopAppendLimiter();
    }

    private boolean appendBlock(BlockPeek blockPeek) {
        ByteBuffer rawBuffer = blockPeek.getRawBuffer();
        ByteBuffer flip = ByteBuffer.allocate(rawBuffer.remaining()).put(rawBuffer).flip();
        Tuple<Long, Long> readLowestHighestPosition = readLowestHighestPosition(flip);
        this.appendBackpressureMetrics.newEntryToAppend();
        if (!this.appendEntryLimiter.tryAcquire((Long) readLowestHighestPosition.getRight())) {
            this.appendBackpressureMetrics.deferred();
            LOG.trace("Backpressure happens: in flight {} limit {}", Integer.valueOf(this.appendEntryLimiter.getInflight()), Integer.valueOf(this.appendEntryLimiter.getLimit()));
            return false;
        }
        this.logStorage.append(((Long) readLowestHighestPosition.getLeft()).longValue(), ((Long) readLowestHighestPosition.getRight()).longValue(), flip, new Listener(this, ((Long) readLowestHighestPosition.getRight()).longValue(), this.appenderMetrics.startAppendLatencyTimer(), this.appenderMetrics.startCommitLatencyTimer()));
        blockPeek.markCompleted();
        return true;
    }

    protected Map<String, String> createContext() {
        Map<String, String> createContext = super.createContext();
        createContext.put("partitionId", Integer.toString(this.partitionId));
        return createContext;
    }

    public String getName() {
        return this.name;
    }

    protected void onActorStarting() {
        this.actor.consume(this.writeBufferSubscription, this::onWriteBufferAvailable);
    }

    protected void onActorClosed() {
        this.closeFuture.complete((Object) null);
    }

    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return this.closeFuture;
        }
        super.closeAsync();
        return this.closeFuture;
    }

    protected void handleFailure(Throwable th) {
        onFailure(th);
    }

    public void onActorFailed() {
        this.closeFuture.complete((Object) null);
    }

    private void onWriteBufferAvailable() {
        BlockPeek blockPeek = new BlockPeek();
        boolean z = this.writeBufferSubscription.peekBlock(blockPeek, this.maxAppendBlockSize, true) > 0;
        boolean z2 = false;
        if (z) {
            z2 = appendBlock(blockPeek);
        }
        if (z && z2) {
            return;
        }
        this.actor.yieldThread();
    }

    private Tuple<Long, Long> readLowestHighestPosition(ByteBuffer byteBuffer) {
        DirectBuffer unsafeBuffer = new UnsafeBuffer(byteBuffer);
        Tuple<Long, Long> tuple = new Tuple<>(Long.MAX_VALUE, Long.MIN_VALUE);
        int i = 0;
        do {
            this.positionReader.wrap(unsafeBuffer, i);
            long position = this.positionReader.getPosition();
            tuple.setLeft(Long.valueOf(Math.min(((Long) tuple.getLeft()).longValue(), position)));
            tuple.setRight(Long.valueOf(Math.max(((Long) tuple.getRight()).longValue(), position)));
            i += this.positionReader.getLength();
        } while (i < unsafeBuffer.capacity());
        return tuple;
    }

    public HealthStatus getHealthStatus() {
        return this.actor.isClosed() ? HealthStatus.UNHEALTHY : HealthStatus.HEALTHY;
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListeners.add(failureListener);
        });
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListeners.remove(failureListener);
        });
    }

    private void onFailure(Throwable th) {
        LOG.error("Actor {} failed in phase {}.", new Object[]{this.name, this.actor.getLifecyclePhase(), th});
        this.actor.fail();
        this.failureListeners.forEach((v0) -> {
            v0.onFailure();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runOnFailure(Throwable th) {
        this.actor.run(() -> {
            onFailure(th);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseBackPressure(long j) {
        this.actor.run(() -> {
            this.appendEntryLimiter.onCommit(j);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyWritePosition(long j, Histogram.Timer timer) {
        this.actor.run(() -> {
            this.appenderMetrics.setLastAppendedPosition(j);
            timer.close();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyCommitPosition(long j, Histogram.Timer timer) {
        this.actor.run(() -> {
            this.appenderMetrics.setLastCommittedPosition(j);
            timer.close();
        });
    }
}
