package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.engine.Loggers;
import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.EmptyProcessingResult;
import io.camunda.zeebe.engine.api.InterPartitionCommandSender;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.logstreams.impl.log.LoggedEventImpl;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.logstreams.util.SyncLogStream;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorBuilder;
import io.camunda.zeebe.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.streamprocessor.TestScheduledCommandCache;
import io.camunda.zeebe.streamprocessor.state.DbLastProcessedPositionState;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.agrona.DirectBuffer;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/engine/util/StreamPlatform.class */
public final class StreamPlatform {
    private static final String SNAPSHOT_FOLDER = "snapshot";
    private static final Logger LOG = Loggers.STREAM_PROCESSING;
    private static final int DEFAULT_PARTITION = 1;
    private static final String STREAM_NAME = "stream-";
    private final Path dataDirectory;
    private final List<AutoCloseable> closeables;
    private final ActorScheduler actorScheduler;
    private LogContext logContext;
    private ProcessorContext processorContext;
    private List<RecordProcessor> recordProcessors;
    private final RecordProcessor defaultMockedRecordProcessor;
    private final ZeebeDbFactory zeebeDbFactory;
    private final StreamProcessorLifecycleAware mockProcessorLifecycleAware;
    private final StreamProcessorListener mockStreamProcessorListener;
    private TestScheduledCommandCache.TestCommandCache scheduledCommandCache;
    private boolean snapshotWasTaken = false;
    private final StreamProcessorMode streamProcessorMode = StreamProcessorMode.PROCESSING;
    private final WriteActor writeActor = new WriteActor();
    private int maxCommandsInBatch = 1;
    private final CommandResponseWriter mockCommandResponseWriter = (CommandResponseWriter) Mockito.mock(CommandResponseWriter.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/engine/util/StreamPlatform$LogContext.class */
    public static final class LogContext extends Record implements AutoCloseable {
        private final SynchronousLogStream logStream;

        private LogContext(SynchronousLogStream synchronousLogStream) {
            this.logStream = synchronousLogStream;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.logStream.close();
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, LogContext.class), LogContext.class, "logStream", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$LogContext;->logStream:Lio/camunda/zeebe/logstreams/util/SynchronousLogStream;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, LogContext.class), LogContext.class, "logStream", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$LogContext;->logStream:Lio/camunda/zeebe/logstreams/util/SynchronousLogStream;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, LogContext.class, Object.class), LogContext.class, "logStream", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$LogContext;->logStream:Lio/camunda/zeebe/logstreams/util/SynchronousLogStream;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public SynchronousLogStream logStream() {
            return this.logStream;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext.class */
    public static final class ProcessorContext extends Record implements AutoCloseable {
        private final StreamProcessor streamProcessor;
        private final ZeebeDb zeebeDb;
        private final Path runtimePath;
        private final Path snapshotPath;

        private ProcessorContext(StreamProcessor streamProcessor, ZeebeDb zeebeDb, Path path, Path path2) {
            this.streamProcessor = streamProcessor;
            this.zeebeDb = zeebeDb;
            this.runtimePath = path;
            this.snapshotPath = path2;
        }

        public void snapshot() {
            this.zeebeDb.createSnapshot(this.snapshotPath.toFile());
        }

        public Long getLastSuccessfulProcessedRecordPosition() {
            return Long.valueOf(new DbLastProcessedPositionState(this.zeebeDb, this.zeebeDb.createContext()).getLastSuccessfulProcessedRecordPosition());
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            if (this.streamProcessor.isClosed()) {
                return;
            }
            StreamPlatform.LOG.debug("Close stream processor");
            this.streamProcessor.closeAsync().join();
            this.zeebeDb.close();
            if (this.runtimePath.toFile().exists()) {
                FileUtil.deleteFolder(this.runtimePath);
            }
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ProcessorContext.class), ProcessorContext.class, "streamProcessor;zeebeDb;runtimePath;snapshotPath", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->streamProcessor:Lio/camunda/zeebe/streamprocessor/StreamProcessor;", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->zeebeDb:Lio/camunda/zeebe/db/ZeebeDb;", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->runtimePath:Ljava/nio/file/Path;", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->snapshotPath:Ljava/nio/file/Path;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ProcessorContext.class), ProcessorContext.class, "streamProcessor;zeebeDb;runtimePath;snapshotPath", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->streamProcessor:Lio/camunda/zeebe/streamprocessor/StreamProcessor;", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->zeebeDb:Lio/camunda/zeebe/db/ZeebeDb;", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->runtimePath:Ljava/nio/file/Path;", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->snapshotPath:Ljava/nio/file/Path;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ProcessorContext.class, Object.class), ProcessorContext.class, "streamProcessor;zeebeDb;runtimePath;snapshotPath", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->streamProcessor:Lio/camunda/zeebe/streamprocessor/StreamProcessor;", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->zeebeDb:Lio/camunda/zeebe/db/ZeebeDb;", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->runtimePath:Ljava/nio/file/Path;", "FIELD:Lio/camunda/zeebe/engine/util/StreamPlatform$ProcessorContext;->snapshotPath:Ljava/nio/file/Path;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public StreamProcessor streamProcessor() {
            return this.streamProcessor;
        }

        public ZeebeDb zeebeDb() {
            return this.zeebeDb;
        }

        public Path runtimePath() {
            return this.runtimePath;
        }

        public Path snapshotPath() {
            return this.snapshotPath;
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/engine/util/StreamPlatform$WriteActor.class */
    private static final class WriteActor extends Actor {
        private WriteActor() {
        }

        public ActorFuture<Long> submit(Callable<Long> callable) {
            return this.actor.call(callable);
        }
    }

    public StreamPlatform(Path path, List<AutoCloseable> list, ActorScheduler actorScheduler, ZeebeDbFactory zeebeDbFactory) {
        this.dataDirectory = path;
        this.closeables = list;
        this.actorScheduler = actorScheduler;
        this.zeebeDbFactory = zeebeDbFactory;
        Mockito.when(this.mockCommandResponseWriter.intent((Intent) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.key(ArgumentMatchers.anyLong())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.partitionId(ArgumentMatchers.anyInt())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.recordType((RecordType) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.rejectionType((RejectionType) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.rejectionReason((DirectBuffer) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.valueType((ValueType) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.valueWriter((BufferWriter) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        actorScheduler.submitActor(this.writeActor);
        this.defaultMockedRecordProcessor = (RecordProcessor) Mockito.mock(RecordProcessor.class);
        Mockito.when(this.defaultMockedRecordProcessor.process((TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(EmptyProcessingResult.INSTANCE);
        Mockito.when(this.defaultMockedRecordProcessor.onProcessingError((Throwable) ArgumentMatchers.any(), (TypedRecord) ArgumentMatchers.any(), (ProcessingResultBuilder) ArgumentMatchers.any())).thenReturn(EmptyProcessingResult.INSTANCE);
        Mockito.when(Boolean.valueOf(this.defaultMockedRecordProcessor.accepts((ValueType) ArgumentMatchers.any()))).thenReturn(true);
        this.recordProcessors = List.of(this.defaultMockedRecordProcessor);
        list.add(() -> {
            this.recordProcessors.clear();
        });
        this.mockProcessorLifecycleAware = (StreamProcessorLifecycleAware) Mockito.mock(StreamProcessorLifecycleAware.class);
        this.mockStreamProcessorListener = (StreamProcessorListener) Mockito.mock(StreamProcessorListener.class);
    }

    public void resetMockInvocations() {
        Mockito.clearInvocations(new Object[]{this.mockCommandResponseWriter, this.mockProcessorLifecycleAware, this.mockStreamProcessorListener, this.defaultMockedRecordProcessor});
    }

    public CommandResponseWriter getMockCommandResponseWriter() {
        return this.mockCommandResponseWriter;
    }

    public void setMaxCommandsInBatch(int i) {
        this.maxCommandsInBatch = i;
    }

    public void createLogStream() {
        ListLogStorage listLogStorage = new ListLogStorage();
        SyncLogStream build = SyncLogStream.builder().withLogName("stream-1").withLogStorage(listLogStorage).withPartitionId(1).withActorSchedulingService(this.actorScheduler).build();
        Objects.requireNonNull(build);
        listLogStorage.setPositionListener(build::setLastWrittenPosition);
        this.logContext = new LogContext(build);
        this.closeables.add(this.logContext);
    }

    public SynchronousLogStream getLogStream() {
        return this.logContext.logStream();
    }

    public Stream<LoggedEvent> events() {
        AutoCloseable newLogStreamReader = getLogStream().newLogStreamReader();
        this.closeables.add(newLogStreamReader);
        newLogStreamReader.seekToFirstEvent();
        Iterable iterable = () -> {
            return newLogStreamReader;
        };
        return StreamSupport.stream(iterable.spliterator(), false).map(loggedEvent -> {
            LoggedEventImpl loggedEventImpl = (LoggedEventImpl) loggedEvent;
            DirectBuffer cloneBuffer = BufferUtil.cloneBuffer(loggedEventImpl.getBuffer(), loggedEventImpl.getFragmentOffset(), loggedEventImpl.getLength());
            LoggedEventImpl loggedEventImpl2 = new LoggedEventImpl();
            loggedEventImpl2.wrap(cloneBuffer, 0);
            return loggedEventImpl2;
        });
    }

    public Path createRuntimeFolder(SynchronousLogStream synchronousLogStream) {
        Path resolve = this.dataDirectory.resolve(synchronousLogStream.getLogName()).resolve("state");
        try {
            Files.createDirectories(resolve, new FileAttribute[0]);
        } catch (FileAlreadyExistsException e) {
        } catch (IOException e2) {
            throw new UncheckedIOException(e2);
        }
        return resolve.resolve("runtime");
    }

    public StreamPlatform withRecordProcessors(List<RecordProcessor> list) {
        this.recordProcessors = list;
        return this;
    }

    public StreamProcessorListener getMockStreamProcessorListener() {
        return this.mockStreamProcessorListener;
    }

    public StreamProcessor startStreamProcessor() {
        return buildStreamProcessor(getLogStream(), true, StreamProcessorMode.PROCESSING);
    }

    public StreamProcessor startStreamProcessorInReplayOnlyMode() {
        return buildStreamProcessor(getLogStream(), false, StreamProcessorMode.REPLAY);
    }

    public StreamProcessor startStreamProcessorNotAwaitOpening() {
        return buildStreamProcessor(getLogStream(), false, StreamProcessorMode.PROCESSING);
    }

    public StreamProcessorLifecycleAware getMockProcessorLifecycleAware() {
        return this.mockProcessorLifecycleAware;
    }

    public TestScheduledCommandCache.TestCommandCache scheduledCommandCache() {
        return this.scheduledCommandCache;
    }

    public StreamProcessor buildStreamProcessor(SynchronousLogStream synchronousLogStream, boolean z, StreamProcessorMode streamProcessorMode) {
        Path createRuntimeFolder = createRuntimeFolder(synchronousLogStream);
        Path resolve = createRuntimeFolder.getParent().resolve(SNAPSHOT_FOLDER);
        this.scheduledCommandCache = new TestScheduledCommandCache.TestCommandCache();
        ZeebeDb createDb = this.snapshotWasTaken ? this.zeebeDbFactory.createDb(resolve.toFile()) : this.zeebeDbFactory.createDb(createRuntimeFolder.toFile());
        StreamProcessorBuilder partitionCommandSender = StreamProcessor.builder().logStream(synchronousLogStream.getAsyncLogStream()).zeebeDb(createDb).actorSchedulingService(this.actorScheduler).commandResponseWriter(this.mockCommandResponseWriter).recordProcessors(this.recordProcessors).eventApplierFactory(EventAppliers::new).streamProcessorMode(streamProcessorMode).listener(this.mockStreamProcessorListener).maxCommandsInBatch(this.maxCommandsInBatch).scheduledCommandCache(this.scheduledCommandCache).partitionCommandSender((InterPartitionCommandSender) Mockito.mock(InterPartitionCommandSender.class));
        partitionCommandSender.getLifecycleListeners().add(this.mockProcessorLifecycleAware);
        StreamProcessor build = partitionCommandSender.build();
        ActorFuture openAsync = build.openAsync(false);
        if (z) {
            ((StreamProcessorLifecycleAware) Mockito.verify(this.mockProcessorLifecycleAware, Mockito.timeout(15000L))).onRecovered((ReadonlyStreamProcessorContext) ArgumentMatchers.any());
        }
        openAsync.join(15L, TimeUnit.SECONDS);
        this.processorContext = new ProcessorContext(build, createDb, createRuntimeFolder, resolve);
        this.closeables.add(this.processorContext);
        return build;
    }

    public void pauseProcessing() {
        this.processorContext.streamProcessor.pauseProcessing().join();
        LOG.info("Paused processing for processor {}", this.processorContext.streamProcessor.getName());
    }

    public void resumeProcessing() {
        this.processorContext.streamProcessor.resumeProcessing();
        LOG.info("Resume processing for processor {}", this.processorContext.streamProcessor.getName());
    }

    public void snapshot() {
        this.processorContext.snapshot();
        this.snapshotWasTaken = true;
        LOG.info("Snapshot database for processor {}", this.processorContext.streamProcessor.getName());
    }

    public long getLastSuccessfulProcessedRecordPosition() {
        return this.processorContext.getLastSuccessfulProcessedRecordPosition().longValue();
    }

    public RecordProcessor getDefaultMockedRecordProcessor() {
        return this.defaultMockedRecordProcessor;
    }

    public StreamProcessor getStreamProcessor() {
        return (StreamProcessor) Optional.ofNullable(this.processorContext).map(processorContext -> {
            return processorContext.streamProcessor;
        }).orElseThrow(() -> {
            return new NoSuchElementException("No stream processor found.");
        });
    }

    public LogStreamBatchWriter setupBatchWriter(RecordToWrite[] recordToWriteArr) {
        LogStreamBatchWriter newLogStreamBatchWriter = getLogStream().newLogStreamBatchWriter();
        for (RecordToWrite recordToWrite : recordToWriteArr) {
            newLogStreamBatchWriter.event().key(recordToWrite.getKey()).sourceIndex(recordToWrite.getSourceIndex()).metadataWriter(recordToWrite.getRecordMetadata()).valueWriter(recordToWrite.getUnifiedRecordValue()).done();
        }
        return newLogStreamBatchWriter;
    }

    public long writeBatch(RecordToWrite... recordToWriteArr) {
        LogStreamBatchWriter logStreamBatchWriter = setupBatchWriter(recordToWriteArr);
        WriteActor writeActor = this.writeActor;
        Objects.requireNonNull(logStreamBatchWriter);
        return ((Long) writeActor.submit(logStreamBatchWriter::tryWrite).join()).longValue();
    }

    public void closeStreamProcessor() throws Exception {
        this.processorContext.close();
    }

    public ZeebeDb getZeebeDb() {
        return this.processorContext.zeebeDb;
    }
}
