package io.debezium.connector.common;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Metronome;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/common/BaseSourceTask.class */
public abstract class BaseSourceTask extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class);
    private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
    private final ReentrantLock stateLock = new ReentrantLock();
    private volatile ElapsedTimeStrategy restartDelay;
    private volatile Map<String, String> props;
    private ChangeEventSourceCoordinator coordinator;
    private volatile Map<String, ?> lastOffset;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/connector/common/BaseSourceTask$State.class */
    public enum State {
        RUNNING,
        STOPPED
    }

    public final void start(Map<String, String> map) {
        if (this.context == null) {
            throw new ConnectException("Unexpected null context");
        }
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
                LOGGER.info("Connector has already been started");
                return;
            }
            this.props = map;
            Configuration from = Configuration.from(map);
            Iterable<Field> allConfigurationFields = getAllConfigurationFields();
            Logger logger = LOGGER;
            logger.getClass();
            if (!from.validateAndRecord(allConfigurationFields, logger::error)) {
                throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Starting {} with configuration:", getClass().getSimpleName());
                from.withMaskedPasswords().forEach((str, str2) -> {
                    LOGGER.info("   {} = {}", str, str2);
                });
            }
            this.coordinator = start(from);
        } finally {
            this.stateLock.unlock();
        }
    }

    protected abstract ChangeEventSourceCoordinator start(Configuration configuration);

    public final List<SourceRecord> poll() throws InterruptedException {
        if (!startIfNeededAndPossible()) {
            Metronome.parker(Duration.of(2L, ChronoUnit.SECONDS), Clock.SYSTEM).pause();
            return Collections.emptyList();
        }
        try {
            return doPoll();
        } catch (RetriableException e) {
            stop(true);
            throw e;
        }
    }

    protected abstract List<SourceRecord> doPoll() throws InterruptedException;

    private boolean startIfNeededAndPossible() {
        this.stateLock.lock();
        try {
            if (this.state.get() == State.RUNNING) {
                return true;
            }
            if (this.restartDelay == null || !this.restartDelay.hasElapsed()) {
                LOGGER.info("Awaiting end of restart backoff period after a retriable error");
                return false;
            }
            start(this.props);
            return true;
        } finally {
            this.stateLock.unlock();
        }
    }

    public final void stop() {
        stop(false);
    }

    private void stop(boolean z) {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.RUNNING, State.STOPPED)) {
                LOGGER.info("Connector has already been stopped");
                return;
            }
            if (z) {
                LOGGER.warn("Going to restart connector after 10 sec. after a retriable exception");
            } else {
                LOGGER.info("Stopping down connector");
            }
            try {
                if (this.coordinator != null) {
                    this.coordinator.stop();
                }
                doStop();
                if (z && this.restartDelay == null) {
                    this.restartDelay = ElapsedTimeStrategy.constant(Clock.system(), 10000L);
                    this.restartDelay.hasElapsed();
                }
            } catch (InterruptedException e) {
                Thread.interrupted();
                LOGGER.error("Interrupted while stopping coordinator", e);
                throw new ConnectException("Interrupted while stopping coordinator, failing the task");
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    protected abstract void doStop();

    public void commitRecord(SourceRecord sourceRecord) throws InterruptedException {
        Map<String, ?> sourceOffset = sourceRecord.sourceOffset();
        if (sourceOffset != null) {
            this.lastOffset = sourceOffset;
        }
    }

    public void commit() throws InterruptedException {
        if (!this.stateLock.tryLock()) {
            LOGGER.warn("Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart");
            return;
        }
        try {
            if (this.coordinator != null && this.lastOffset != null) {
                this.coordinator.commitOffset(this.lastOffset);
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    protected abstract Iterable<Field> getAllConfigurationFields();

    protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) {
        Map<String, ?> partition = loader.getPartition();
        if (this.lastOffset != null) {
            OffsetContext load = loader.load(this.lastOffset);
            LOGGER.info("Found previous offset after restart {}", load);
            return load;
        }
        Map<String, ?> map = (Map) this.context.offsetStorageReader().offsets(Collections.singleton(partition)).get(partition);
        if (map == null) {
            return null;
        }
        OffsetContext load2 = loader.load(map);
        LOGGER.info("Found previous offset {}", load2);
        return load2;
    }
}
