package io.debezium.server.pubsub;

import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

@Named("pubsub")
@Dependent
/* loaded from: input_file:io/debezium/server/pubsub/PubSubChangeConsumer.class */
public class PubSubChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PubSubChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.pubsub.";
    private static final String PROP_PROJECT_ID = "debezium.sink.pubsub.project.id";
    private String projectId;
    private final Map<String, Publisher> publishers = new HashMap();
    private PublisherBuilder publisherBuilder;

    @ConfigProperty(name = "debezium.sink.pubsub.ordering.enabled", defaultValue = "true")
    boolean orderingEnabled;

    @ConfigProperty(name = "debezium.sink.pubsub.null.key", defaultValue = "default")
    String nullKey;

    @ConfigProperty(name = "debezium.sink.pubsub.batch.delay.threshold.ms", defaultValue = "100")
    Integer maxDelayThresholdMs;

    @ConfigProperty(name = "debezium.sink.pubsub.batch.element.count.threshold", defaultValue = "100")
    Long maxBufferSize;

    @ConfigProperty(name = "debezium.sink.pubsub.batch.request.byte.threshold", defaultValue = "10000000")
    Long maxBufferBytes;

    @ConfigProperty(name = "debezium.sink.pubsub.flowcontrol.enabled", defaultValue = "false")
    boolean flowControlEnabled;

    @ConfigProperty(name = "debezium.sink.pubsub.flowcontrol.max.outstanding.messages", defaultValue = "9223372036854775807")
    Long maxOutstandingMessages;

    @ConfigProperty(name = "debezium.sink.pubsub.flowcontrol.max.outstanding.bytes", defaultValue = "9223372036854775807")
    Long maxOutstandingRequestBytes;

    @ConfigProperty(name = "debezium.sink.pubsub.retry.total.timeout.ms", defaultValue = "60000")
    Integer maxTotalTimeoutMs;

    @ConfigProperty(name = "debezium.sink.pubsub.retry.max.rpc.timeout.ms", defaultValue = "10000")
    Integer maxRequestTimeoutMs;

    @ConfigProperty(name = "debezium.sink.pubsub.retry.initial.delay.ms", defaultValue = "5")
    Integer initialRetryDelay;

    @ConfigProperty(name = "debezium.sink.pubsub.retry.delay.multiplier", defaultValue = "2.0")
    Double retryDelayMultiplier;

    @ConfigProperty(name = "debezium.sink.pubsub.retry.max.delay.ms", defaultValue = "9223372036854775807")
    Long maxRetryDelay;

    @ConfigProperty(name = "debezium.sink.pubsub.retry.initial.rpc.timeout.ms", defaultValue = "10000")
    Integer initialRpcTimeout;

    @ConfigProperty(name = "debezium.sink.pubsub.retry.rpc.timeout.multiplier", defaultValue = "2.0")
    Double rpcTimeoutMultiplier;

    @ConfigProperty(name = "debezium.sink.pubsub.address")
    Optional<String> address;

    @Inject
    @CustomConsumerBuilder
    Instance<PublisherBuilder> customPublisherBuilder;
    private ManagedChannel channel;
    private TransportChannelProvider channelProvider;
    private CredentialsProvider credentialsProvider;

    /* loaded from: input_file:io/debezium/server/pubsub/PubSubChangeConsumer$PublisherBuilder.class */
    public interface PublisherBuilder {
        Publisher get(ProjectTopicName projectTopicName);
    }

    @PostConstruct
    void connect() {
        this.projectId = (String) ConfigProvider.getConfig().getOptionalValue(PROP_PROJECT_ID, String.class).orElse(ServiceOptions.getDefaultProjectId());
        if (this.customPublisherBuilder.isResolvable()) {
            this.publisherBuilder = (PublisherBuilder) this.customPublisherBuilder.get();
            LOGGER.info("Obtained custom configured PublisherBuilder '{}'", this.customPublisherBuilder);
            return;
        }
        BatchingSettings.Builder requestByteThreshold = BatchingSettings.newBuilder().setDelayThreshold(Duration.ofMillis(this.maxDelayThresholdMs.intValue())).setElementCountThreshold(this.maxBufferSize).setRequestByteThreshold(this.maxBufferBytes);
        if (this.flowControlEnabled) {
            requestByteThreshold.setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(this.maxOutstandingRequestBytes).setMaxOutstandingElementCount(this.maxOutstandingMessages).setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block).build());
        }
        if (this.address.isPresent()) {
            this.channel = ManagedChannelBuilder.forTarget(this.address.get()).usePlaintext().build();
            this.channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(this.channel));
            this.credentialsProvider = NoCredentialsProvider.create();
        }
        this.publisherBuilder = projectTopicName -> {
            try {
                Publisher.Builder retrySettings = Publisher.newBuilder(projectTopicName).setEnableMessageOrdering(this.orderingEnabled).setBatchingSettings(requestByteThreshold.build()).setRetrySettings(RetrySettings.newBuilder().setTotalTimeout(Duration.ofMillis(this.maxTotalTimeoutMs.intValue())).setMaxRpcTimeout(Duration.ofMillis(this.maxRequestTimeoutMs.intValue())).setInitialRetryDelay(Duration.ofMillis(this.initialRetryDelay.intValue())).setRetryDelayMultiplier(this.retryDelayMultiplier.doubleValue()).setMaxRetryDelay(Duration.ofMillis(this.maxRetryDelay.longValue())).setInitialRpcTimeout(Duration.ofMillis(this.initialRpcTimeout.intValue())).setRpcTimeoutMultiplier(this.rpcTimeoutMultiplier.doubleValue()).build());
                if (this.address.isPresent()) {
                    retrySettings.setChannelProvider(this.channelProvider).setCredentialsProvider(this.credentialsProvider);
                }
                return retrySettings.build();
            } catch (IOException e) {
                throw new DebeziumException(e);
            }
        };
        LOGGER.info("Using default PublisherBuilder '{}'", this.publisherBuilder);
    }

    @PreDestroy
    void close() {
        this.publishers.values().forEach(publisher -> {
            try {
                publisher.shutdown();
            } catch (Exception e) {
                LOGGER.warn("Exception while closing publisher: {}", e);
            }
        });
        if (this.channel == null || this.channel.isShutdown()) {
            return;
        }
        this.channel.shutdown();
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (ChangeEvent<Object, Object> changeEvent : list) {
            LOGGER.trace("Received event '{}'", changeEvent);
            Publisher computeIfAbsent = this.publishers.computeIfAbsent(this.streamNameMapper.map(changeEvent.destination()), str -> {
                return this.publisherBuilder.get(ProjectTopicName.of(this.projectId, str));
            });
            PubsubMessage.Builder newBuilder = PubsubMessage.newBuilder();
            if (this.orderingEnabled) {
                if (changeEvent.key() == null) {
                    newBuilder.setOrderingKey(this.nullKey);
                } else if (changeEvent.key() instanceof String) {
                    newBuilder.setOrderingKey((String) changeEvent.key());
                } else if (changeEvent.key() instanceof byte[]) {
                    newBuilder.setOrderingKeyBytes(ByteString.copyFrom((byte[]) changeEvent.key()));
                }
            }
            if (changeEvent.value() instanceof String) {
                newBuilder.setData(ByteString.copyFromUtf8((String) changeEvent.value()));
            } else if (changeEvent.value() instanceof byte[]) {
                newBuilder.setData(ByteString.copyFrom((byte[]) changeEvent.value()));
            }
            newBuilder.putAllAttributes(convertHeaders(changeEvent));
            arrayList.add(computeIfAbsent.publish(newBuilder.build()));
            recordCommitter.markProcessed(changeEvent);
        }
        try {
            LOGGER.trace("Sent messages with ids: {}", (List) ApiFutures.allAsList(arrayList).get());
            recordCommitter.markBatchFinished();
        } catch (ExecutionException e) {
            throw new DebeziumException(e);
        }
    }

    public boolean supportsTombstoneEvents() {
        return false;
    }
}
