package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.stream.impl.StreamProcessorListener;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;

/* loaded from: input_file:io/camunda/zeebe/engine/util/StreamProcessingComposite.class */
public class StreamProcessingComposite {
    private static final String STREAM_NAME = "stream-";
    private final TestStreams streams;
    private final int partitionId;
    private final ZeebeDbFactory<?> zeebeDbFactory;
    private MutableProcessingState processingState;
    private final WriteActor writeActor = new WriteActor();

    @FunctionalInterface
    /* loaded from: input_file:io/camunda/zeebe/engine/util/StreamProcessingComposite$StreamProcessorTestFactory.class */
    public interface StreamProcessorTestFactory {
        TypedRecordProcessors build(TypedRecordProcessors typedRecordProcessors, TypedRecordProcessorContext typedRecordProcessorContext);
    }

    /* loaded from: input_file:io/camunda/zeebe/engine/util/StreamProcessingComposite$WriteActor.class */
    private static final class WriteActor extends Actor {
        private WriteActor() {
        }

        public ActorFuture<Long> submit(Callable<Long> callable) {
            return this.actor.call(callable);
        }
    }

    public StreamProcessingComposite(TestStreams testStreams, int i, ZeebeDbFactory<?> zeebeDbFactory, ActorScheduler actorScheduler) {
        this.streams = testStreams;
        this.partitionId = i;
        this.zeebeDbFactory = zeebeDbFactory;
        actorScheduler.submitActor(this.writeActor).join();
    }

    public SynchronousLogStream getLogStream(int i) {
        return this.streams.getLogStream(getLogName(i));
    }

    public LogStreamWriter newLogStreamWriter(int i) {
        return this.streams.newLogStreamWriter(getLogName(i));
    }

    public StreamProcessor startTypedStreamProcessor(StreamProcessorTestFactory streamProcessorTestFactory, Optional<StreamProcessorListener> optional) {
        return startTypedStreamProcessor(typedRecordProcessorContext -> {
            return createTypedRecordProcessors(streamProcessorTestFactory, typedRecordProcessorContext);
        }, optional);
    }

    private TypedRecordProcessors createTypedRecordProcessors(StreamProcessorTestFactory streamProcessorTestFactory, TypedRecordProcessorContext typedRecordProcessorContext) {
        this.processingState = typedRecordProcessorContext.getProcessingState();
        return streamProcessorTestFactory.build(TypedRecordProcessors.processors(this.processingState.getKeyGenerator(), typedRecordProcessorContext.getWriters()), typedRecordProcessorContext);
    }

    public StreamProcessor startTypedStreamProcessor(TypedRecordProcessorFactory typedRecordProcessorFactory, Optional<StreamProcessorListener> optional) {
        return startTypedStreamProcessor(this.partitionId, typedRecordProcessorFactory, optional);
    }

    public StreamProcessor startTypedStreamProcessor(int i, TypedRecordProcessorFactory typedRecordProcessorFactory, Optional<StreamProcessorListener> optional) {
        return this.streams.startStreamProcessor(getLogName(i), this.zeebeDbFactory, typedRecordProcessorContext -> {
            this.processingState = typedRecordProcessorContext.getProcessingState();
            return typedRecordProcessorFactory.createProcessors(typedRecordProcessorContext);
        }, optional);
    }

    public void pauseProcessing(int i) {
        this.streams.pauseProcessing(getLogName(i));
    }

    public void resumeProcessing(int i) {
        this.streams.resumeProcessing(getLogName(i));
    }

    public void snapshot(int i) {
        this.streams.snapshot(getLogName(i));
    }

    public void closeStreamProcessor(int i) {
        try {
            this.streams.closeProcessor(getLogName(i));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public StreamProcessor getStreamProcessor(int i) {
        return this.streams.getStreamProcessor(getLogName(i));
    }

    public MutableProcessingState getProcessingState() {
        return this.processingState;
    }

    public RecordStream events() {
        return new RecordStream(this.streams.events(getLogName(this.partitionId)));
    }

    public long writeBatch(RecordToWrite... recordToWriteArr) {
        LogStreamWriter newLogStreamWriter = this.streams.newLogStreamWriter(getLogName(this.partitionId));
        return ((Long) this.writeActor.submit(() -> {
            return (Long) newLogStreamWriter.tryWrite(Arrays.asList(recordToWriteArr)).get();
        }).join()).longValue();
    }

    public long writeCommandOnPartition(int i, Intent intent, UnifiedRecordValue unifiedRecordValue) {
        TestStreams.FluentLogWriter event = this.streams.newRecord(getLogName(i)).recordType(RecordType.COMMAND).intent(intent).event(unifiedRecordValue);
        WriteActor writeActor = this.writeActor;
        Objects.requireNonNull(event);
        return ((Long) writeActor.submit(event::write).join()).longValue();
    }

    public long writeCommandOnPartition(int i, long j, Intent intent, UnifiedRecordValue unifiedRecordValue) {
        TestStreams.FluentLogWriter event = this.streams.newRecord(getLogName(i)).key(j).recordType(RecordType.COMMAND).intent(intent).event(unifiedRecordValue);
        WriteActor writeActor = this.writeActor;
        Objects.requireNonNull(event);
        return ((Long) writeActor.submit(event::write).join()).longValue();
    }

    public long writeCommand(long j, Intent intent, UnifiedRecordValue unifiedRecordValue) {
        TestStreams.FluentLogWriter event = this.streams.newRecord(getLogName(this.partitionId)).recordType(RecordType.COMMAND).key(j).intent(intent).event(unifiedRecordValue);
        WriteActor writeActor = this.writeActor;
        Objects.requireNonNull(event);
        return ((Long) writeActor.submit(event::write).join()).longValue();
    }

    public long writeCommand(Intent intent, UnifiedRecordValue unifiedRecordValue) {
        TestStreams.FluentLogWriter event = this.streams.newRecord(getLogName(this.partitionId)).recordType(RecordType.COMMAND).intent(intent).event(unifiedRecordValue);
        WriteActor writeActor = this.writeActor;
        Objects.requireNonNull(event);
        return ((Long) writeActor.submit(event::write).join()).longValue();
    }

    public long writeCommand(int i, long j, Intent intent, UnifiedRecordValue unifiedRecordValue) {
        TestStreams.FluentLogWriter event = this.streams.newRecord(getLogName(this.partitionId)).recordType(RecordType.COMMAND).requestId(j).requestStreamId(i).intent(intent).event(unifiedRecordValue);
        WriteActor writeActor = this.writeActor;
        Objects.requireNonNull(event);
        return ((Long) writeActor.submit(event::write).join()).longValue();
    }

    public static String getLogName(int i) {
        return "stream-" + i;
    }
}
