package io.debezium.connector.spanner.processor;

import com.google.common.annotations.VisibleForTesting;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.connector.spanner.SpannerPartition;
import io.debezium.connector.spanner.context.source.SourceInfo;
import io.debezium.connector.spanner.context.source.SourceInfoFactory;
import io.debezium.connector.spanner.db.metadata.SchemaRegistry;
import io.debezium.connector.spanner.db.metadata.TableId;
import io.debezium.connector.spanner.exception.SpannerConnectorException;
import io.debezium.connector.spanner.kafka.KafkaPartitionInfoProvider;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/processor/SpannerEventDispatcher.class */
public class SpannerEventDispatcher extends EventDispatcher<SpannerPartition, TableId> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpannerEventDispatcher.class);
    private final SpannerConnectorConfig connectorConfig;
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final TopicNamingStrategy<TableId> topicNamingStrategy;
    private final SchemaRegistry schemaRegistry;
    private final DatabaseSchema<TableId> schema;
    private final SourceInfoFactory sourceInfoFactory;
    private final KafkaPartitionInfoProvider kafkaPartitionInfoProvider;

    public SpannerEventDispatcher(SpannerConnectorConfig spannerConnectorConfig, TopicNamingStrategy<TableId> topicNamingStrategy, DatabaseSchema<TableId> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<TableId> dataCollectionFilter, ChangeEventCreator changeEventCreator, EventMetadataProvider eventMetadataProvider, HeartbeatFactory<TableId> heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster, SchemaRegistry schemaRegistry, SourceInfoFactory sourceInfoFactory, KafkaPartitionInfoProvider kafkaPartitionInfoProvider) {
        super(spannerConnectorConfig, topicNamingStrategy, databaseSchema, changeEventQueue, dataCollectionFilter, changeEventCreator, eventMetadataProvider, heartbeatFactory.createHeartbeat(), schemaNameAdjuster);
        this.connectorConfig = spannerConnectorConfig;
        this.queue = changeEventQueue;
        this.topicNamingStrategy = topicNamingStrategy;
        this.schemaRegistry = schemaRegistry;
        this.schema = databaseSchema;
        this.sourceInfoFactory = sourceInfoFactory;
        this.kafkaPartitionInfoProvider = kafkaPartitionInfoProvider;
    }

    public boolean publishLowWatermarkStampEvent() {
        try {
            for (TableId tableId : this.schemaRegistry.getAllTables()) {
                String dataChangeTopic = this.topicNamingStrategy.dataChangeTopic(tableId);
                DataCollectionSchema schemaFor = this.schema.schemaFor(tableId);
                Struct struct = this.sourceInfoFactory.getSourceInfoForLowWatermarkStamp(tableId).struct();
                Iterator<Integer> it = this.kafkaPartitionInfoProvider.getPartitions(dataChangeTopic, Optional.of(Integer.valueOf(this.connectorConfig.getTopicNumPartitions()))).iterator();
                while (it.hasNext()) {
                    SourceRecord emitSourceRecord = emitSourceRecord(dataChangeTopic, schemaFor, it.next().intValue(), struct);
                    LOGGER.debug("Build low watermark stamp record {} ", emitSourceRecord);
                    this.queue.enqueue(new DataChangeEvent(emitSourceRecord));
                }
            }
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        } catch (Exception e2) {
            if (CommonConnectorConfig.EventProcessingFailureHandlingMode.FAIL.equals(this.connectorConfig.getEventProcessingFailureHandlingMode())) {
                throw new SpannerConnectorException("Error while publishing watermark stamp", e2);
            }
            LOGGER.warn("Error while publishing watermark stamp");
            return false;
        }
    }

    @VisibleForTesting
    SourceRecord emitSourceRecord(String str, DataCollectionSchema dataCollectionSchema, int i, Struct struct) {
        return new SourceRecord((Map) null, (Map) null, str, Integer.valueOf(i), (Schema) null, (Object) null, dataCollectionSchema.getEnvelopeSchema().schema(), buildMessage(dataCollectionSchema.getEnvelopeSchema(), struct), (Long) null, SourceRecordUtils.from("watermark-" + UUID.randomUUID()));
    }

    @VisibleForTesting
    Struct buildMessage(Envelope envelope, Struct struct) {
        Struct struct2 = new Struct(envelope.schema());
        struct2.put("op", Envelope.Operation.MESSAGE.code());
        struct2.put(SourceInfo.SOURCE_KEY, struct);
        struct2.put("ts_ms", Long.valueOf(Instant.now().toEpochMilli()));
        return struct2;
    }

    public void destroy() {
        super.close();
    }

    public void close() {
    }
}
