package io.debezium.server.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.pubsub.v1.PubsubMessage;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

@QuarkusTest
@EnabledIfSystemProperty(named = "debezium.sink.type", matches = "pubsublite")
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
/* loaded from: input_file:io/debezium/server/pubsub/PubSubLiteIT.class */
public class PubSubLiteIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String STREAM_NAME = "testc.inventory.customers";
    private static final char zoneId = 'b';
    protected static Subscriber subscriber;

    @Inject
    DebeziumServer server;
    private static final String projectId = ServiceOptions.getDefaultProjectId();
    private static final String cloudRegion = "us-central1";
    private static final String SUBSCRIPTION_NAME = "testsubs";
    private static final SubscriptionPath subscriptionPath = SubscriptionPath.newBuilder().setLocation(CloudZone.of(CloudRegion.of(cloudRegion), 'b')).setProject(ProjectId.of(projectId)).setName(SubscriptionName.of(SUBSCRIPTION_NAME)).build();
    private static final List<PubsubMessage> messages = Collections.synchronizedList(new ArrayList());

    /* loaded from: input_file:io/debezium/server/pubsub/PubSubLiteIT$TestMessageReceiver.class */
    class TestMessageReceiver implements MessageReceiver {
        TestMessageReceiver() {
        }

        public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
            Testing.print("Message arrived: " + pubsubMessage);
            PubSubLiteIT.messages.add(pubsubMessage);
            ackReplyConsumer.ack();
        }
    }

    public PubSubLiteIT() {
        Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(PubSubLiteTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() throws IOException {
        if (subscriber != null) {
            subscriber.stopAsync();
            subscriber.awaitTerminated();
            try {
                AdminClient create = AdminClient.create(AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build());
                try {
                    create.deleteSubscription(subscriptionPath).get();
                    if (create != null) {
                        create.close();
                    }
                } finally {
                }
            } catch (InterruptedException | ExecutionException e) {
                Testing.printError(e);
            }
        }
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) throws IOException {
        Testing.Print.enable();
        Subscription build = Subscription.newBuilder().setDeliveryConfig(Subscription.DeliveryConfig.newBuilder().setDeliveryRequirement(Subscription.DeliveryConfig.DeliveryRequirement.DELIVER_IMMEDIATELY)).setName(subscriptionPath.toString()).setTopic(TopicPath.newBuilder().setProject(ProjectId.of(projectId)).setLocation(CloudZone.of(CloudRegion.of(cloudRegion), 'b')).setName(TopicName.of(STREAM_NAME)).build().toString()).build();
        AdminClientSettings build2 = AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
        SubscriberSettings build3 = SubscriberSettings.newBuilder().setSubscriptionPath(subscriptionPath).setReceiver(new TestMessageReceiver()).setPerPartitionFlowControlSettings(FlowControlSettings.builder().setBytesOutstanding(10485760L).setMessagesOutstanding(1000L).build()).build();
        try {
            AdminClient create = AdminClient.create(build2);
            try {
                create.createSubscription(build).get();
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            Testing.printError(e);
        }
        Subscriber.create(build3).startAsync().awaitRunning();
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw ((Exception) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testPubSubLite() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
            return Boolean.valueOf(messages.size() >= MESSAGE_COUNT);
        });
        Assertions.assertThat(messages.size() >= MESSAGE_COUNT);
    }
}
