package io.debezium.pipeline.source;

import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.TableId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.class */
public abstract class AbstractSnapshotChangeEventSource<P extends Partition, O extends OffsetContext> implements SnapshotChangeEventSource<P, O>, AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSnapshotChangeEventSource.class);
    public static final Duration LOG_INTERVAL = Duration.ofMillis(CommonConnectorConfig.DEFAULT_RETRIABLE_RESTART_WAIT);
    private final CommonConnectorConfig connectorConfig;
    private final SnapshotProgressListener<P> snapshotProgressListener;
    protected final NotificationService<P, O> notificationService;

    /* loaded from: input_file:io/debezium/pipeline/source/AbstractSnapshotChangeEventSource$SnapshotContext.class */
    public static class SnapshotContext<P extends Partition, O extends OffsetContext> implements AutoCloseable {
        public P partition;
        public O offset;

        public SnapshotContext(P p) {
            this.partition = p;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
        }
    }

    public AbstractSnapshotChangeEventSource(CommonConnectorConfig commonConnectorConfig, SnapshotProgressListener<P> snapshotProgressListener, NotificationService<P, O> notificationService) {
        this.connectorConfig = commonConnectorConfig;
        this.snapshotProgressListener = snapshotProgressListener;
        this.notificationService = notificationService;
    }

    protected Offsets<P, OffsetContext> getOffsets(SnapshotContext<P, O> snapshotContext, O o, SnapshottingTask snapshottingTask) {
        return Offsets.of(snapshotContext.partition, o);
    }

    @Override // io.debezium.pipeline.source.spi.SnapshotChangeEventSource
    public SnapshotResult<O> execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, P p, O o, SnapshottingTask snapshottingTask) throws InterruptedException {
        try {
            SnapshotContext<P, O> prepare = prepare(p, snapshottingTask.isOnDemand());
            this.connectorConfig.getBeanRegistry().add(StandardBeanNames.SNAPSHOT_CONTEXT, prepare);
            Offsets<P, OffsetContext> offsets = getOffsets(prepare, o, snapshottingTask);
            if (snapshottingTask.shouldSkipSnapshot()) {
                LOGGER.debug("Skipping snapshotting");
                this.notificationService.initialSnapshotNotificationService().notifySkipped(offsets.getTheOnlyPartition(), offsets.getTheOnlyOffset());
                return SnapshotResult.skipped(o);
            }
            delaySnapshotIfNeeded(changeEventSourceContext);
            try {
                try {
                    this.snapshotProgressListener.snapshotStarted(p);
                    this.notificationService.initialSnapshotNotificationService().notifyStarted(offsets.getTheOnlyPartition(), offsets.getTheOnlyOffset());
                    SnapshotResult<O> doExecute = doExecute(changeEventSourceContext, o, prepare, snapshottingTask);
                    LOGGER.info("Snapshot - Final stage");
                    close();
                    if (1 != 0) {
                        LOGGER.info("Snapshot completed");
                        completed(prepare);
                        this.snapshotProgressListener.snapshotCompleted(p);
                        this.notificationService.initialSnapshotNotificationService().notifyCompleted(offsets.getTheOnlyPartition(), offsets.getTheOnlyOffset());
                    } else {
                        LOGGER.warn("Snapshot was not completed successfully, it will be re-executed upon connector restart");
                        aborted(prepare);
                        this.snapshotProgressListener.snapshotAborted(offsets.getTheOnlyPartition());
                    }
                    return doExecute;
                } catch (InterruptedException e) {
                    LOGGER.warn("Snapshot was interrupted before completion");
                    throw e;
                } catch (Exception e2) {
                    throw new DebeziumException(e2);
                }
            } catch (Throwable th) {
                LOGGER.info("Snapshot - Final stage");
                close();
                if (1 != 0) {
                    LOGGER.info("Snapshot completed");
                    completed(prepare);
                    this.snapshotProgressListener.snapshotCompleted(p);
                    this.notificationService.initialSnapshotNotificationService().notifyCompleted(offsets.getTheOnlyPartition(), offsets.getTheOnlyOffset());
                } else {
                    LOGGER.warn("Snapshot was not completed successfully, it will be re-executed upon connector restart");
                    aborted(prepare);
                    this.snapshotProgressListener.snapshotAborted(offsets.getTheOnlyPartition());
                }
                throw th;
            }
        } catch (Exception e3) {
            LOGGER.error("Failed to initialize snapshot context.", e3);
            throw new RuntimeException(e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends DataCollectionId> Stream<T> determineDataCollectionsToBeSnapshotted(Collection<T> collection, Set<Pattern> set) {
        return set.isEmpty() ? collection.stream() : collection.stream().filter(dataCollectionId -> {
            return set.stream().anyMatch(tableNameMatcher(dataCollectionId));
        });
    }

    private static <T extends DataCollectionId> Predicate<Pattern> tableNameMatcher(T t) {
        return pattern -> {
            return pattern.matcher(t.identifier()).matches() || pattern.matcher(((TableId) t).toDoubleQuotedString()).matches();
        };
    }

    protected void delaySnapshotIfNeeded(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws InterruptedException {
        Duration snapshotDelay = this.connectorConfig.getSnapshotDelay();
        if (snapshotDelay.isZero() || snapshotDelay.isNegative()) {
            return;
        }
        Threads.Timer timer = Threads.timer(Clock.SYSTEM, snapshotDelay);
        Metronome parker = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
        while (!timer.expired()) {
            if (!changeEventSourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while awaiting initial snapshot delay");
            }
            LOGGER.info("The connector will wait for {}s before proceeding", Long.valueOf(timer.remaining().getSeconds()));
            parker.pause();
        }
    }

    protected abstract SnapshotResult<O> doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, O o, SnapshotContext<P, O> snapshotContext, SnapshottingTask snapshottingTask) throws Exception;

    protected abstract SnapshotContext<P, O> prepare(P p, boolean z) throws Exception;

    @Override // java.lang.AutoCloseable
    public void close() {
    }

    protected void completed(SnapshotContext<P, O> snapshotContext) {
    }

    protected void aborted(SnapshotContext<P, O> snapshotContext) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<Pattern> getDataCollectionPattern(List<String> list) {
        return (Set) list.stream().map(str -> {
            return Strings.setOfRegex(str, 2);
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
    }
}
