package io.debezium.server.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.TopicName;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.TestDatabase;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
/* loaded from: input_file:io/debezium/server/pubsub/PubSubIT.class */
public class PubSubIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String STREAM_NAME = "testc.inventory.customers";
    protected static Subscriber subscriber;

    @Inject
    DebeziumServer server;
    protected static TestDatabase db = null;
    private static final String SUBSCRIPTION_NAME = "testsubs";
    private static ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(ServiceOptions.getDefaultProjectId(), SUBSCRIPTION_NAME);
    private static final List<PubsubMessage> messages = Collections.synchronizedList(new ArrayList());

    /* loaded from: input_file:io/debezium/server/pubsub/PubSubIT$TestMessageReceiver.class */
    class TestMessageReceiver implements MessageReceiver {
        TestMessageReceiver() {
        }

        public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
            Testing.print("Message arrived: " + pubsubMessage);
            PubSubIT.messages.add(pubsubMessage);
            ackReplyConsumer.ack();
        }
    }

    public PubSubIT() {
        Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(PubSubTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() throws IOException {
        if (db != null) {
            db.stop();
        }
        if (subscriber != null) {
            subscriber.stopAsync();
            subscriber.awaitTerminated();
            SubscriptionAdminClient create = SubscriptionAdminClient.create();
            Throwable th = null;
            try {
                create.deleteSubscription(subscriptionName);
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        }
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) throws IOException {
        Testing.Print.enable();
        SubscriptionAdminClient create = SubscriptionAdminClient.create();
        Throwable th = null;
        try {
            create.createSubscription(subscriptionName, TopicName.ofProjectTopicName(ServiceOptions.getDefaultProjectId(), STREAM_NAME), PushConfig.newBuilder().build(), 0);
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
            subscriber = Subscriber.newBuilder(subscriptionName, new TestMessageReceiver()).build();
            subscriber.startAsync().awaitRunning();
            db = new TestDatabase();
            db.start();
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw ((Exception) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testPubSub() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
            return Boolean.valueOf(messages.size() >= MESSAGE_COUNT);
        });
        Assertions.assertThat(messages.size() >= MESSAGE_COUNT);
    }
}
