package io.debezium.server.pubsub;

import com.google.api.core.ApiFutures;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("pubsublite")
@Dependent
/* loaded from: input_file:io/debezium/server/pubsub/PubSubLiteChangeConsumer.class */
public class PubSubLiteChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PubSubLiteChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.pubsublite.";
    private static final String PROP_PROJECT_ID = "debezium.sink.pubsublite.project.id";
    private static final String PROP_REGION = "debezium.sink.pubsublite.region";
    private PublisherBuilder publisherBuilder;
    private final Map<String, Publisher> publishers = new HashMap();

    @ConfigProperty(name = "debezium.sink.pubsublite.ordering.enabled", defaultValue = "true")
    boolean orderingEnabled;

    @ConfigProperty(name = "debezium.sink.pubsublite.null.key", defaultValue = "default")
    String nullKey;

    @ConfigProperty(name = "debezium.sink.pubsublite.wait.message.delivery.timeout.ms", defaultValue = "30000")
    Integer waitMessageDeliveryTimeout;

    @Inject
    @CustomConsumerBuilder
    Instance<PublisherBuilder> customPublisherBuilder;

    /* loaded from: input_file:io/debezium/server/pubsub/PubSubLiteChangeConsumer$PublisherBuilder.class */
    public interface PublisherBuilder {
        Publisher get(String str);
    }

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        String str = (String) config.getOptionalValue(PROP_PROJECT_ID, String.class).orElse(ServiceOptions.getDefaultProjectId());
        String str2 = (String) config.getValue(PROP_REGION, String.class);
        if (this.customPublisherBuilder.isResolvable()) {
            this.publisherBuilder = (PublisherBuilder) this.customPublisherBuilder.get();
            LOGGER.info("Obtained custom configured PublisherBuilder '{}'", this.customPublisherBuilder);
        } else {
            this.publisherBuilder = str3 -> {
                Publisher create = Publisher.create(PublisherSettings.newBuilder().setTopicPath(TopicPath.newBuilder().setName(TopicName.of(str3)).setProject(ProjectId.of(str)).setLocation(CloudRegionOrZone.parse(str2)).build()).build());
                create.startAsync().awaitRunning();
                return create;
            };
            LOGGER.info("Using default PublisherBuilder '{}'", this.publisherBuilder);
        }
    }

    @PreDestroy
    void close() {
        this.publishers.values().forEach(publisher -> {
            try {
                publisher.stopAsync().awaitTerminated();
            } catch (Exception e) {
                LOGGER.warn("Exception while closing publisher: " + e);
            }
        });
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (ChangeEvent<Object, Object> changeEvent : list) {
            LOGGER.trace("Received event '{}'", changeEvent);
            arrayList.add(this.publishers.computeIfAbsent(this.streamNameMapper.map(changeEvent.destination()), str -> {
                return this.publisherBuilder.get(str);
            }).publish(buildPubSubMessage(changeEvent)));
            recordCommitter.markProcessed(changeEvent);
        }
        try {
            LOGGER.trace("Sent messages with ids: {}", (List) ApiFutures.allAsList(arrayList).get(this.waitMessageDeliveryTimeout.intValue(), TimeUnit.MILLISECONDS));
            recordCommitter.markBatchFinished();
        } catch (ExecutionException | TimeoutException e) {
            throw new DebeziumException(e);
        }
    }

    private PubsubMessage buildPubSubMessage(ChangeEvent<Object, Object> changeEvent) {
        PubsubMessage.Builder newBuilder = PubsubMessage.newBuilder();
        if (this.orderingEnabled) {
            if (changeEvent.key() == null) {
                newBuilder.setOrderingKey(this.nullKey);
            } else if (changeEvent.key() instanceof String) {
                newBuilder.setOrderingKey((String) changeEvent.key());
            } else if (changeEvent.key() instanceof byte[]) {
                newBuilder.setOrderingKeyBytes(ByteString.copyFrom((byte[]) changeEvent.key()));
            }
        }
        if (changeEvent.value() instanceof String) {
            newBuilder.setData(ByteString.copyFromUtf8((String) changeEvent.value()));
        } else if (changeEvent.value() instanceof byte[]) {
            newBuilder.setData(ByteString.copyFrom((byte[]) changeEvent.value()));
        }
        newBuilder.putAllAttributes(convertHeaders(changeEvent));
        return newBuilder.build();
    }

    public boolean supportsTombstoneEvents() {
        return false;
    }
}
