package io.debezium.server.pubsub;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.TopicName;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List({@QuarkusTestResource(PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(PubSubTestResourceLifecycleManager.class)})
/* loaded from: input_file:io/debezium/server/pubsub/PubSubIT.class */
public class PubSubIT {
    private static final int MESSAGE_COUNT = 4;
    protected static Subscriber subscriber;
    private static ManagedChannel channel;
    private static TransportChannelProvider channelProvider;
    private static CredentialsProvider credentialsProvider;

    @Inject
    DebeziumServer server;

    @ConfigProperty(name = "debezium.source.database.port")
    String postgresPort;
    private static final String SUBSCRIPTION_NAME = "testsubs";
    private static ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(ServiceOptions.getDefaultProjectId(), SUBSCRIPTION_NAME);
    private static final String STREAM_NAME = "testc.inventory.customers";
    private static TopicName topicName = TopicName.of(ServiceOptions.getDefaultProjectId(), STREAM_NAME);
    private static final List<PubsubMessage> messages = Collections.synchronizedList(new ArrayList());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/debezium/server/pubsub/PubSubIT$TestMessageReceiver.class */
    public class TestMessageReceiver implements MessageReceiver {
        TestMessageReceiver() {
        }

        public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
            Testing.print("Message arrived: " + pubsubMessage);
            Assertions.assertTrue(pubsubMessage.getAttributesMap().containsKey("headerKey"));
            PubSubIT.messages.add(pubsubMessage);
            ackReplyConsumer.ack();
        }
    }

    public PubSubIT() {
        Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(PubSubTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() throws IOException {
        if (subscriber != null) {
            subscriber.stopAsync();
            subscriber.awaitTerminated();
            SubscriptionAdminClient createSubscriptionAdminClient = createSubscriptionAdminClient();
            try {
                createSubscriptionAdminClient.deleteSubscription(subscriptionName);
                if (createSubscriptionAdminClient != null) {
                    createSubscriptionAdminClient.close();
                }
                TopicAdminClient createTopicAdminClient = createTopicAdminClient();
                try {
                    createTopicAdminClient.deleteTopic(topicName);
                    if (createTopicAdminClient != null) {
                        createTopicAdminClient.close();
                    }
                } catch (Throwable th) {
                    if (createTopicAdminClient != null) {
                        try {
                            createTopicAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (createSubscriptionAdminClient != null) {
                    try {
                        createSubscriptionAdminClient.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
        if (channel == null || channel.isShutdown()) {
            return;
        }
        channel.shutdown();
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) throws IOException {
        Testing.Print.enable();
        createChannel();
        try {
            SubscriptionAdminClient createSubscriptionAdminClient = createSubscriptionAdminClient();
            try {
                createSubscriptionAdminClient.deleteSubscription(subscriptionName);
                if (createSubscriptionAdminClient != null) {
                    createSubscriptionAdminClient.close();
                }
            } finally {
            }
        } catch (NotFoundException e) {
        }
        try {
            TopicAdminClient createTopicAdminClient = createTopicAdminClient();
            try {
                createTopicAdminClient.deleteTopic(topicName);
                if (createTopicAdminClient != null) {
                    createTopicAdminClient.close();
                }
            } catch (Throwable th) {
                if (createTopicAdminClient != null) {
                    try {
                        createTopicAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (NotFoundException e2) {
        }
        TopicAdminClient createTopicAdminClient2 = createTopicAdminClient();
        try {
            Testing.print("Created topic: " + createTopicAdminClient2.createTopic(topicName).getName());
            if (createTopicAdminClient2 != null) {
                createTopicAdminClient2.close();
            }
            SubscriptionAdminClient createSubscriptionAdminClient2 = createSubscriptionAdminClient();
            try {
                createSubscriptionAdminClient2.createSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), 0);
                if (createSubscriptionAdminClient2 != null) {
                    createSubscriptionAdminClient2.close();
                }
                subscriber = createSubscriber();
                subscriber.startAsync().awaitRunning();
            } catch (Throwable th3) {
                if (createSubscriptionAdminClient2 != null) {
                    try {
                        createSubscriptionAdminClient2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (createTopicAdminClient2 != null) {
                try {
                    createTopicAdminClient2.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    void createChannel() {
        channel = ManagedChannelBuilder.forTarget(PubSubTestResourceLifecycleManager.getEmulatorEndpoint()).usePlaintext().build();
        channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
        credentialsProvider = NoCredentialsProvider.create();
        Testing.print("Executing test towards pubsub emulator running at: " + PubSubTestResourceLifecycleManager.getEmulatorEndpoint());
    }

    Subscriber createSubscriber() {
        return Subscriber.newBuilder(subscriptionName, new TestMessageReceiver()).setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider).build();
    }

    static SubscriptionAdminClient createSubscriptionAdminClient() throws IOException {
        return SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder().setTransportChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider).build());
    }

    static TopicAdminClient createTopicAdminClient() throws IOException {
        return TopicAdminClient.create(TopicAdminSettings.newBuilder().setTransportChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider).build());
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw ((Exception) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testPubSub() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
            return Boolean.valueOf(messages.size() >= MESSAGE_COUNT);
        });
        org.assertj.core.api.Assertions.assertThat(messages.size()).isGreaterThanOrEqualTo(MESSAGE_COUNT);
        messages.clear();
        PostgresConnection postgresConnection = new PostgresConnection(defaultJdbcConfig(), "debezium-server-test");
        try {
            postgresConnection.execute(new String[]{"INSERT INTO inventory.customers VALUES (10000, 'Test', 'PubSub', 'testpubsub@example.org')", "DELETE FROM inventory.customers WHERE id = 10000"});
            postgresConnection.close();
            Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
                return Boolean.valueOf(messages.size() >= 2);
            });
            org.assertj.core.api.Assertions.assertThat(messages.size()).isEqualTo(2);
        } catch (Throwable th) {
            try {
                postgresConnection.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private JdbcConfiguration defaultJdbcConfig() {
        return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")).with(CommonConnectorConfig.TOPIC_PREFIX, "dbserver1").withDefault(JdbcConfiguration.DATABASE, "postgres").withDefault(JdbcConfiguration.HOSTNAME, "localhost").withDefault(JdbcConfiguration.PORT, Integer.parseInt(this.postgresPort)).withDefault(JdbcConfiguration.USER, "postgres").withDefault(JdbcConfiguration.PASSWORD, "postgres").build();
    }
}
