package io.camunda.zeebe.engine.util;

import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.Loggers;
import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.InterPartitionCommandSender;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.logstreams.util.SyncLogStream;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.CopiedRecord;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.agrona.DirectBuffer;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/engine/util/TestStreams.class */
public final class TestStreams {
    private static final String SNAPSHOT_FOLDER = "snapshot";
    private static final Map<Class<?>, ValueType> VALUE_TYPES = new HashMap();
    private static final Logger LOG = Loggers.STREAM_PROCESSING;
    private final TemporaryFolder dataDirectory;
    private final AutoCloseableRule closeables;
    private final ActorScheduler actorScheduler;
    private final StreamProcessorListener mockStreamProcessorListener;
    private final Map<String, LogContext> logContextMap = new HashMap();
    private final Map<String, ProcessorContext> streamContextMap = new HashMap();
    private boolean snapshotWasTaken = false;
    private Function<MutableZeebeState, EventApplier> eventApplierFactory = EventAppliers::new;
    private StreamProcessorMode streamProcessorMode = StreamProcessorMode.PROCESSING;
    private final CommandResponseWriter mockCommandResponseWriter = (CommandResponseWriter) Mockito.mock(CommandResponseWriter.class);

    /* loaded from: input_file:io/camunda/zeebe/engine/util/TestStreams$FluentLogWriter.class */
    public static class FluentLogWriter {
        protected final LogStreamRecordWriter writer;
        protected UnpackedObject value;
        protected final RecordMetadata metadata = new RecordMetadata();
        protected long key = -1;
        private long sourceRecordPosition = -1;

        public FluentLogWriter(LogStreamRecordWriter logStreamRecordWriter) {
            this.writer = logStreamRecordWriter;
            this.metadata.protocolVersion(3);
        }

        public FluentLogWriter record(CopiedRecord copiedRecord) {
            intent(copiedRecord.getIntent());
            key(copiedRecord.getKey());
            sourceRecordPosition(copiedRecord.getSourceRecordPosition());
            recordType(copiedRecord.getRecordType());
            event(copiedRecord.getValue());
            return this;
        }

        public FluentLogWriter intent(Intent intent) {
            this.metadata.intent(intent);
            return this;
        }

        public FluentLogWriter requestId(long j) {
            this.metadata.requestId(j);
            return this;
        }

        public FluentLogWriter sourceRecordPosition(long j) {
            this.sourceRecordPosition = j;
            return this;
        }

        public FluentLogWriter requestStreamId(int i) {
            this.metadata.requestStreamId(i);
            return this;
        }

        public FluentLogWriter recordType(RecordType recordType) {
            this.metadata.recordType(recordType);
            return this;
        }

        public FluentLogWriter key(long j) {
            this.key = j;
            return this;
        }

        public FluentLogWriter event(UnpackedObject unpackedObject) {
            ValueType valueType = TestStreams.VALUE_TYPES.get(unpackedObject.getClass());
            if (valueType == null) {
                throw new RuntimeException("No event type registered for getValue " + unpackedObject.getClass());
            }
            this.metadata.valueType(valueType);
            this.value = unpackedObject;
            return this;
        }

        public long write() {
            this.writer.sourceRecordPosition(this.sourceRecordPosition);
            if (this.key >= 0) {
                this.writer.key(this.key);
            } else {
                this.writer.keyNull();
            }
            this.writer.metadataWriter(this.metadata);
            this.writer.valueWriter(this.value);
            LogStreamRecordWriter logStreamRecordWriter = this.writer;
            Objects.requireNonNull(logStreamRecordWriter);
            return ((Long) TestUtil.doRepeatedly(logStreamRecordWriter::tryWrite).until(l -> {
                return Boolean.valueOf(l.longValue() >= 0);
            })).longValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/engine/util/TestStreams$LogContext.class */
    public static final class LogContext implements AutoCloseable {
        private final SynchronousLogStream logStream;
        private final LogStreamRecordWriter logStreamWriter;

        private LogContext(SynchronousLogStream synchronousLogStream, LogStorage logStorage) {
            this.logStream = synchronousLogStream;
            this.logStreamWriter = synchronousLogStream.newLogStreamRecordWriter();
        }

        public static LogContext createLogContext(SyncLogStream syncLogStream, LogStorage logStorage) {
            return new LogContext(syncLogStream, logStorage);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.logStream.close();
        }

        public LogStreamRecordWriter getLogStreamWriter() {
            return this.logStreamWriter;
        }

        public SynchronousLogStream getLogStream() {
            return this.logStream;
        }

        public LogStreamRecordWriter newLogStreamRecordWriter() {
            return this.logStream.newLogStreamRecordWriter();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/engine/util/TestStreams$ProcessorContext.class */
    public static final class ProcessorContext implements AutoCloseable {
        private final LogContext logContext;
        private final ZeebeDb zeebeDb;
        private final StreamProcessor streamProcessor;
        private final Path runtimePath;
        private final Path snapshotPath;
        private boolean closed = false;

        private ProcessorContext(LogContext logContext, StreamProcessor streamProcessor, ZeebeDb zeebeDb, Path path, Path path2) {
            this.logContext = logContext;
            this.streamProcessor = streamProcessor;
            this.zeebeDb = zeebeDb;
            this.runtimePath = path;
            this.snapshotPath = path2;
        }

        public static ProcessorContext createStreamContext(LogContext logContext, StreamProcessor streamProcessor, ZeebeDb zeebeDb, Path path, Path path2) {
            return new ProcessorContext(logContext, streamProcessor, zeebeDb, path, path2);
        }

        public SynchronousLogStream getLogStream() {
            return this.logContext.getLogStream();
        }

        public void snapshot() {
            this.zeebeDb.createSnapshot(this.snapshotPath.toFile());
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            if (this.closed) {
                return;
            }
            TestStreams.LOG.debug("Close stream processor");
            this.streamProcessor.closeAsync().join();
            this.zeebeDb.close();
            if (this.runtimePath.toFile().exists()) {
                FileUtil.deleteFolder(this.runtimePath);
            }
            this.closed = true;
        }
    }

    public TestStreams(TemporaryFolder temporaryFolder, AutoCloseableRule autoCloseableRule, ActorScheduler actorScheduler) {
        this.dataDirectory = temporaryFolder;
        this.closeables = autoCloseableRule;
        this.actorScheduler = actorScheduler;
        Mockito.when(this.mockCommandResponseWriter.intent((Intent) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.key(ArgumentMatchers.anyLong())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.partitionId(ArgumentMatchers.anyInt())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.recordType((RecordType) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.rejectionType((RejectionType) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.rejectionReason((DirectBuffer) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.valueType((ValueType) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.valueWriter((BufferWriter) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(Boolean.valueOf(this.mockCommandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong()))).thenReturn(true);
        this.mockStreamProcessorListener = (StreamProcessorListener) Mockito.mock(StreamProcessorListener.class);
    }

    public void withEventApplierFactory(Function<MutableZeebeState, EventApplier> function) {
        this.eventApplierFactory = function;
    }

    public void withStreamProcessorMode(StreamProcessorMode streamProcessorMode) {
        this.streamProcessorMode = streamProcessorMode;
    }

    public CommandResponseWriter getMockedResponseWriter() {
        return this.mockCommandResponseWriter;
    }

    public StreamProcessorListener getMockStreamProcessorListener() {
        return this.mockStreamProcessorListener;
    }

    public SynchronousLogStream createLogStream(String str) {
        return createLogStream(str, 0);
    }

    public SynchronousLogStream createLogStream(String str, int i) {
        ListLogStorage listLogStorage = new ListLogStorage();
        return createLogStream(str, i, listLogStorage, syncLogStream -> {
            Objects.requireNonNull(syncLogStream);
            listLogStorage.setPositionListener(syncLogStream::setLastWrittenPosition);
        });
    }

    public SynchronousLogStream createLogStream(String str, int i, ListLogStorage listLogStorage) {
        return createLogStream(str, i, listLogStorage, syncLogStream -> {
            Objects.requireNonNull(syncLogStream);
            listLogStorage.setPositionListener(syncLogStream::setLastWrittenPosition);
        });
    }

    private SynchronousLogStream createLogStream(String str, int i, LogStorage logStorage, Consumer<SyncLogStream> consumer) {
        SyncLogStream build = SyncLogStream.builder().withLogName(str).withLogStorage(logStorage).withPartitionId(i).withActorSchedulingService(this.actorScheduler).build();
        consumer.accept(build);
        LogContext createLogContext = LogContext.createLogContext(build, logStorage);
        this.logContextMap.put(str, createLogContext);
        this.closeables.manage(createLogContext);
        this.closeables.manage(() -> {
            this.logContextMap.remove(str);
        });
        return build;
    }

    public long getLastWrittenPosition(String str) {
        return getLogStream(str).getLastWrittenPosition();
    }

    public SynchronousLogStream getLogStream(String str) {
        return this.logContextMap.get(str).getLogStream();
    }

    public LogStreamRecordWriter getLogStreamRecordWriter(String str) {
        return this.logContextMap.get(str).getLogStreamWriter();
    }

    public LogStreamRecordWriter newLogStreamRecordWriter(String str) {
        return this.logContextMap.get(str).newLogStreamRecordWriter();
    }

    public Stream<LoggedEvent> events(String str) {
        LogStreamReader newLogStreamReader = getLogStream(str).newLogStreamReader();
        this.closeables.manage(newLogStreamReader);
        newLogStreamReader.seekToFirstEvent();
        Iterable iterable = () -> {
            return newLogStreamReader;
        };
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    public FluentLogWriter newRecord(LogStreamRecordWriter logStreamRecordWriter) {
        return new FluentLogWriter(logStreamRecordWriter);
    }

    public FluentLogWriter newRecord(String str) {
        return new FluentLogWriter(newLogStreamRecordWriter(str));
    }

    public Path createRuntimeFolder(SynchronousLogStream synchronousLogStream) {
        Path resolve = this.dataDirectory.getRoot().toPath().resolve(synchronousLogStream.getLogName()).resolve("state");
        try {
            Files.createDirectories(resolve, new FileAttribute[0]);
        } catch (FileAlreadyExistsException e) {
        } catch (IOException e2) {
            throw new UncheckedIOException(e2);
        }
        return resolve.resolve("runtime");
    }

    public StreamProcessor startStreamProcessor(String str, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory) {
        return startStreamProcessor(str, zeebeDbFactory, typedRecordProcessorFactory, Optional.empty());
    }

    public StreamProcessor startStreamProcessor(String str, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory, Optional<StreamProcessorListener> optional) {
        return buildStreamProcessor(getLogStream(str), zeebeDbFactory, typedRecordProcessorFactory, true, optional);
    }

    public StreamProcessor startStreamProcessorNotAwaitOpening(String str, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory, Optional<StreamProcessorListener> optional) {
        return buildStreamProcessor(getLogStream(str), zeebeDbFactory, typedRecordProcessorFactory, false, optional);
    }

    /* JADX WARN: Type inference failed for: r0v6, types: [io.camunda.zeebe.engine.util.TestStreams$1] */
    public StreamProcessor buildStreamProcessor(SynchronousLogStream synchronousLogStream, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory, boolean z, Optional<StreamProcessorListener> optional) {
        Path createRuntimeFolder = createRuntimeFolder(synchronousLogStream);
        Path resolve = createRuntimeFolder.getParent().resolve(SNAPSHOT_FOLDER);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ?? r0 = new StreamProcessorLifecycleAware() { // from class: io.camunda.zeebe.engine.util.TestStreams.1
            public void onRecovered(ReadonlyStreamProcessorContext readonlyStreamProcessorContext) {
                countDownLatch.countDown();
            }
        };
        TypedRecordProcessorFactory typedRecordProcessorFactory2 = typedRecordProcessorContext -> {
            return typedRecordProcessorFactory.createProcessors(typedRecordProcessorContext).withListener(r0);
        };
        ZeebeDb createDb = this.snapshotWasTaken ? zeebeDbFactory.createDb(resolve.toFile()) : zeebeDbFactory.createDb(createRuntimeFolder.toFile());
        String logName = synchronousLogStream.getLogName();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.mockStreamProcessorListener);
        Objects.requireNonNull(arrayList);
        optional.ifPresent((v1) -> {
            r1.add(v1);
        });
        StreamProcessor build = StreamProcessor.builder().logStream(synchronousLogStream.getAsyncLogStream()).zeebeDb(createDb).actorSchedulingService(this.actorScheduler).commandResponseWriter(this.mockCommandResponseWriter).listener(new StreamProcessorListenerRelay(arrayList)).recordProcessors(List.of(new Engine(typedRecordProcessorFactory2))).eventApplierFactory(this.eventApplierFactory).streamProcessorMode(this.streamProcessorMode).partitionCommandSender((InterPartitionCommandSender) Mockito.mock(InterPartitionCommandSender.class)).build();
        ActorFuture openAsync = build.openAsync(false);
        if (z) {
            try {
                countDownLatch.await(15L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
        openAsync.join(15L, TimeUnit.SECONDS);
        ProcessorContext createStreamContext = ProcessorContext.createStreamContext(this.logContextMap.get(logName), build, createDb, createRuntimeFolder, resolve);
        this.streamContextMap.put(logName, createStreamContext);
        this.closeables.manage(createStreamContext);
        return build;
    }

    public void pauseProcessing(String str) {
        this.streamContextMap.get(str).streamProcessor.pauseProcessing().join();
        LOG.info("Paused processing for stream {}", str);
    }

    public void resumeProcessing(String str) {
        this.streamContextMap.get(str).streamProcessor.resumeProcessing();
        LOG.info("Resume processing for stream {}", str);
    }

    public void snapshot(String str) {
        this.streamContextMap.get(str).snapshot();
        this.snapshotWasTaken = true;
        LOG.info("Snapshot database for stream {}", str);
    }

    public void closeProcessor(String str) throws Exception {
        this.streamContextMap.remove(str).close();
        LOG.info("Closed stream {}", str);
    }

    public StreamProcessor getStreamProcessor(String str) {
        return (StreamProcessor) Optional.ofNullable(this.streamContextMap.get(str)).map(processorContext -> {
            return processorContext.streamProcessor;
        }).orElseThrow(() -> {
            return new NoSuchElementException("No stream processor found with name: " + str);
        });
    }

    public LogStreamBatchWriter setupBatchWriter(String str, RecordToWrite[] recordToWriteArr) {
        LogStreamBatchWriter newLogStreamBatchWriter = getLogStream(str).newLogStreamBatchWriter();
        for (RecordToWrite recordToWrite : recordToWriteArr) {
            newLogStreamBatchWriter.event().key(recordToWrite.getKey()).sourceIndex(recordToWrite.getSourceIndex()).metadataWriter(recordToWrite.getRecordMetadata()).valueWriter(recordToWrite.getUnifiedRecordValue()).done();
        }
        return newLogStreamBatchWriter;
    }

    static {
        TypedEventRegistry.EVENT_REGISTRY.forEach((valueType, cls) -> {
            VALUE_TYPES.put(cls, valueType);
        });
    }
}
