package io.debezium.server.pulsar;

import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.TestDatabase;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
/* loaded from: input_file:io/debezium/server/pulsar/PulsarIT.class */
public class PulsarIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String TOPIC_NAME = "testc.inventory.customers";
    protected static TestDatabase db = null;
    protected static PulsarClient pulsarClient;

    @Inject
    DebeziumServer server;

    public PulsarIT() {
        Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(PulsarTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() throws IOException {
        if (db != null) {
            db.stop();
        }
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) throws IOException {
        Testing.Print.enable();
        pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestConfigSource.getServiceUrl()).build();
        db = new TestDatabase();
        db.start();
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw ((Exception) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testPulsar() throws Exception {
        Consumer subscribe = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{TOPIC_NAME}).subscriptionName("test-" + UUID.randomUUID()).subscribe();
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(PulsarTestConfigSource.waitForSeconds())).until(() -> {
            arrayList.add(subscribe.receive());
            return Boolean.valueOf(arrayList.size() >= MESSAGE_COUNT);
        });
    }
}
