package io.debezium.connector.vitess;

import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.ReplicationMessageProcessor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/vitess/VitessStreamingChangeEventSource.class */
public class VitessStreamingChangeEventSource implements StreamingChangeEventSource<VitessPartition, VitessOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(VitessStreamingChangeEventSource.class);
    private final EventDispatcher<VitessPartition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final VitessDatabaseSchema schema;
    private final VitessConnectorConfig connectorConfig;
    private final ReplicationConnection replicationConnection;
    private final DelayStrategy pauseNoMessage;

    public VitessStreamingChangeEventSource(EventDispatcher<VitessPartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, VitessDatabaseSchema vitessDatabaseSchema, VitessConnectorConfig vitessConnectorConfig, ReplicationConnection replicationConnection) {
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = vitessDatabaseSchema;
        this.connectorConfig = vitessConnectorConfig;
        this.replicationConnection = replicationConnection;
        this.pauseNoMessage = DelayStrategy.constant(vitessConnectorConfig.getPollInterval());
        LOGGER.info("VitessStreamingChangeEventSource is created");
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, VitessPartition vitessPartition, VitessOffsetContext vitessOffsetContext) {
        if (vitessOffsetContext == null) {
            vitessOffsetContext = VitessOffsetContext.initialContext(this.connectorConfig, this.clock);
        }
        try {
            try {
                AtomicReference<Throwable> atomicReference = new AtomicReference<>();
                this.replicationConnection.startStreaming(vitessOffsetContext.getRestartVgtid(), newReplicationMessageProcessor(vitessPartition, vitessOffsetContext), atomicReference);
                while (changeEventSourceContext.isRunning() && atomicReference.get() == null) {
                    this.pauseNoMessage.sleepWhen(true);
                }
                if (atomicReference.get() != null) {
                    LOGGER.error("Error during streaming", atomicReference.get());
                    throw atomicReference.get();
                }
            } catch (Throwable th) {
                this.errorHandler.setProducerThrowable(th);
                try {
                    this.replicationConnection.close();
                } catch (Exception e) {
                    LOGGER.error("Failed to close replicationConnection", e);
                }
            }
        } finally {
            try {
                this.replicationConnection.close();
            } catch (Exception e2) {
                LOGGER.error("Failed to close replicationConnection", e2);
            }
        }
    }

    private ReplicationMessageProcessor newReplicationMessageProcessor(VitessPartition vitessPartition, VitessOffsetContext vitessOffsetContext) {
        return (replicationMessage, vgtid, z) -> {
            if (replicationMessage.isTransactionalMessage()) {
                vitessOffsetContext.rotateVgtid(vgtid, replicationMessage.getCommitTime());
                if (replicationMessage.getOperation() == ReplicationMessage.Operation.BEGIN) {
                    this.dispatcher.dispatchTransactionStartedEvent(vitessPartition, replicationMessage.getTransactionId(), vitessOffsetContext, replicationMessage.getCommitTime());
                    return;
                } else {
                    if (replicationMessage.getOperation() == ReplicationMessage.Operation.COMMIT) {
                        this.dispatcher.dispatchTransactionCommittedEvent(vitessPartition, vitessOffsetContext, replicationMessage.getCommitTime());
                        return;
                    }
                    return;
                }
            }
            if (replicationMessage.getOperation() == ReplicationMessage.Operation.DDL || replicationMessage.getOperation() == ReplicationMessage.Operation.OTHER) {
                vitessOffsetContext.rotateVgtid(vgtid, replicationMessage.getCommitTime());
                return;
            }
            TableId parse = VitessDatabaseSchema.parse(replicationMessage.getTable());
            Objects.requireNonNull(parse);
            vitessOffsetContext.event(parse, replicationMessage.getCommitTime());
            vitessOffsetContext.setShard(replicationMessage.getShard());
            if (z) {
                vitessOffsetContext.resetVgtid(vgtid, replicationMessage.getCommitTime());
            }
            this.dispatcher.dispatchDataChangeEvent(vitessPartition, parse, new VitessChangeRecordEmitter(vitessPartition, vitessOffsetContext, this.clock, this.connectorConfig, this.schema, replicationMessage));
        };
    }
}
