package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.time.Temporals;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import io.debezium.util.Threads;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresConnectorTask.class */
public class PostgresConnectorTask extends SourceTask {
    private static final String CONTEXT_NAME = "postgres-connector-task";
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean running = new AtomicBoolean(false);
    private PostgresTaskContext taskContext;
    private BlockingQueue<ChangeEvent> queue;
    private int maxBatchSize;
    private RecordsProducer producer;
    private Metronome metronome;
    private Duration pollInterval;
    private volatile long lastProcessedLsn;

    public void start(Map<String, String> map) {
        if (this.running.get()) {
            return;
        }
        if (this.context == null) {
            throw new ConnectException("Unexpected null context");
        }
        PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(Configuration.from(map));
        Logger logger = this.logger;
        logger.getClass();
        if (!postgresConnectorConfig.validateAndRecord(logger::error)) {
            throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }
        this.taskContext = new PostgresTaskContext(postgresConnectorConfig, new PostgresSchema(postgresConnectorConfig));
        this.queue = new LinkedBlockingDeque(postgresConnectorConfig.maxQueueSize());
        this.maxBatchSize = postgresConnectorConfig.maxBatchSize();
        SourceInfo sourceInfo = new SourceInfo(postgresConnectorConfig.serverName());
        Map<String, Object> offset = this.context.offsetStorageReader().offset(sourceInfo.partition());
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            try {
                PostgresConnection createConnection = this.taskContext.createConnection();
                Throwable th = null;
                try {
                    try {
                        this.logger.info(createConnection.serverInfo().toString());
                        if (createConnection != null) {
                            if (0 != 0) {
                                try {
                                    createConnection.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createConnection.close();
                            }
                        }
                        if (offset == null) {
                            this.logger.info("No previous offset found");
                            if (postgresConnectorConfig.snapshotNeverAllowed()) {
                                this.logger.info("Snapshots are not allowed as per configuration, starting streaming logical changes only");
                                this.producer = new RecordsStreamProducer(this.taskContext, sourceInfo);
                            } else {
                                createSnapshotProducer(this.taskContext, sourceInfo, postgresConnectorConfig.initialOnlySnapshot());
                            }
                        } else {
                            sourceInfo.load(offset);
                            this.logger.info("Found previous offset {}", sourceInfo);
                            if (sourceInfo.isSnapshotInEffect()) {
                                if (postgresConnectorConfig.snapshotNeverAllowed()) {
                                    throw new ConnectException("The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
                                }
                                this.logger.info("Found previous incomplete snapshot");
                                createSnapshotProducer(this.taskContext, sourceInfo, postgresConnectorConfig.initialOnlySnapshot());
                            } else if (postgresConnectorConfig.alwaysTakeSnapshot()) {
                                this.logger.info("Taking a new snapshot as per configuration");
                                this.producer = new RecordsSnapshotProducer(this.taskContext, sourceInfo, true);
                            } else {
                                this.logger.info("Previous snapshot has completed successfully, streaming logical changes from last known position");
                                this.producer = new RecordsStreamProducer(this.taskContext, sourceInfo);
                            }
                        }
                        this.metronome = Metronome.sleeper(postgresConnectorConfig.pollIntervalMs(), TimeUnit.MILLISECONDS, Clock.SYSTEM);
                        this.pollInterval = Duration.ofMillis(postgresConnectorConfig.pollIntervalMs());
                        this.producer.start(this::enqueueRecord);
                        this.running.compareAndSet(false, true);
                        configureLoggingContext.restore();
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (createConnection != null) {
                        if (th != null) {
                            try {
                                createConnection.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createConnection.close();
                        }
                    }
                    throw th3;
                }
            } catch (SQLException e) {
                throw new ConnectException(e);
            }
        } catch (Throwable th5) {
            configureLoggingContext.restore();
            throw th5;
        }
    }

    private void enqueueRecord(ChangeEvent changeEvent) {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            try {
                this.queue.put(changeEvent);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Placed source record '{}' into queue", changeEvent);
                }
            } catch (InterruptedException e) {
                this.logger.debug("received interrupt request");
                Thread.interrupted();
                configureLoggingContext.restore();
            }
        } finally {
            configureLoggingContext.restore();
        }
    }

    private void createSnapshotProducer(PostgresTaskContext postgresTaskContext, SourceInfo sourceInfo, boolean z) {
        if (z) {
            this.logger.info("Taking only a snapshot of the DB without streaming any changes afterwards...");
            this.producer = new RecordsSnapshotProducer(postgresTaskContext, sourceInfo, false);
        } else {
            this.logger.info("Taking a new snapshot of the DB and streaming logical changes once the snapshot is finished...");
            this.producer = new RecordsSnapshotProducer(postgresTaskContext, sourceInfo, true);
        }
    }

    public void commit() throws InterruptedException {
        if (this.running.get()) {
            this.producer.commit(this.lastProcessedLsn);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            this.logger.debug("polling records...");
            ArrayList arrayList = new ArrayList();
            Threads.Timer timer = Threads.timer(Clock.SYSTEM, Temporals.max(this.pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
            while (this.running.get() && this.queue.drainTo(arrayList, this.maxBatchSize) == 0) {
                if (this.taskContext.getTaskFailure() != null) {
                    throw new ConnectException(this.taskContext.getTaskFailure());
                }
                try {
                    this.logger.debug("no records available yet, sleeping a bit...");
                    this.metronome.pause();
                    if (timer.expired()) {
                        break;
                    }
                    this.logger.debug("checking for more records...");
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            if (arrayList.size() > 0) {
                int size = arrayList.size() - 1;
                while (true) {
                    if (size < 0) {
                        break;
                    }
                    SourceRecord record = ((ChangeEvent) arrayList.get(size)).getRecord();
                    if (((ChangeEvent) arrayList.get(size)).isLastOfLsn()) {
                        this.lastProcessedLsn = ((Long) record.sourceOffset().get(SourceInfo.LSN_KEY)).longValue();
                        break;
                    }
                    size--;
                }
            }
            List<SourceRecord> list = (List) arrayList.stream().map((v0) -> {
                return v0.getRecord();
            }).collect(Collectors.toList());
            configureLoggingContext.restore();
            return list;
        } catch (Throwable th) {
            configureLoggingContext.restore();
            throw th;
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            this.producer.stop();
        }
    }

    public String version() {
        return Module.version();
    }
}
