package io.debezium.server.pravega;

import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.impl.UTF8StringSerializer;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List({@QuarkusTestResource(PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(PravegaTestResource.class)})
/* loaded from: input_file:io/debezium/server/pravega/PravegaIT.class */
public class PravegaIT {
    private static final int MESSAGE_COUNT = 4;
    protected static final String STREAM_NAME = "testc.inventory.customers";
    static EventStreamReader<String> reader;

    public PravegaIT() {
        Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(TestConfigSource.OFFSET_STORE_PATH);
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) {
        Testing.Print.enable();
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) {
        if (!connectorCompletedEvent.isSuccess()) {
            throw new RuntimeException((Throwable) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testPravega() {
        ClientConfig build = ClientConfig.builder().controllerURI(URI.create(PravegaTestResource.getControllerUri())).build();
        ReaderGroupConfig build2 = ReaderGroupConfig.builder().stream(Stream.of(STREAM_NAME, STREAM_NAME)).disableAutomaticCheckpoints().build();
        ReaderGroupManager withScope = ReaderGroupManager.withScope(STREAM_NAME, build);
        try {
            withScope.createReaderGroup(STREAM_NAME, build2);
            if (withScope != null) {
                withScope.close();
            }
            reader = EventStreamClientFactory.withScope(STREAM_NAME, build).createReader("0", STREAM_NAME, new UTF8StringSerializer(), ReaderConfig.builder().build());
            ArrayList arrayList = new ArrayList();
            Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
                String str = (String) reader.readNextEvent(2000L).getEvent();
                if (str != null) {
                    arrayList.add(str);
                }
                return Boolean.valueOf(arrayList.size() >= MESSAGE_COUNT);
            });
        } catch (Throwable th) {
            if (withScope != null) {
                try {
                    withScope.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
