package io.debezium.connector.mysql;

import io.debezium.connector.mysql.RecordMakers;
import io.debezium.function.BlockingConsumer;
import io.debezium.function.BufferedBlockingConsumer;
import io.debezium.function.Predicates;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;

/* loaded from: input_file:io/debezium/connector/mysql/SnapshotReader.class */
public class SnapshotReader extends AbstractReader {
    private boolean minimalBlocking;
    private RecordRecorder recorder;
    private volatile Thread thread;
    private volatile Runnable onSuccessfulCompletion;

    /* loaded from: input_file:io/debezium/connector/mysql/SnapshotReader$RecordRecorder.class */
    protected interface RecordRecorder {
        void recordRow(RecordMakers.RecordsForTable recordsForTable, Object[] objArr, long j) throws InterruptedException;
    }

    public SnapshotReader(MySqlTaskContext mySqlTaskContext) {
        super(mySqlTaskContext);
        this.minimalBlocking = true;
        this.recorder = this::recordRowAsRead;
    }

    public SnapshotReader onSuccessfulCompletion(Runnable runnable) {
        this.onSuccessfulCompletion = runnable;
        return this;
    }

    public SnapshotReader useMinimalBlocking(boolean z) {
        this.minimalBlocking = z;
        return this;
    }

    public SnapshotReader generateReadEvents() {
        this.recorder = this::recordRowAsRead;
        return this;
    }

    public SnapshotReader generateInsertEvents() {
        this.recorder = this::recordRowAsInsert;
        return this;
    }

    @Override // io.debezium.connector.mysql.AbstractReader
    protected void doStart() {
        this.thread = new Thread(this::execute, "mysql-snapshot-" + this.context.serverName());
        this.thread.start();
    }

    @Override // io.debezium.connector.mysql.AbstractReader
    protected void doStop() {
        this.thread.interrupt();
    }

    @Override // io.debezium.connector.mysql.AbstractReader
    protected void doCleanup() {
        this.thread = null;
        this.logger.trace("Completed writing all snapshot records");
        try {
            if (this.onSuccessfulCompletion != null) {
                this.onSuccessfulCompletion.run();
            }
        } catch (RuntimeException e) {
            throw e;
        } catch (Throwable th) {
            throw new ConnectException("Error calling completion function after completing snapshot", th);
        }
    }

    protected void execute() {
        this.context.configureLoggingContext(SourceInfo.SNAPSHOT_KEY);
        AtomicReference<String> atomicReference = new AtomicReference<>();
        JdbcConnection jdbc = this.context.jdbc();
        MySqlSchema dbSchema = this.context.dbSchema();
        Filters filters = dbSchema.filters();
        SourceInfo source = this.context.source();
        Clock clock = this.context.clock();
        long currentTimeInMillis = clock.currentTimeInMillis();
        this.logger.info("Starting snapshot for {} with user '{}'", this.context.connectionString(), jdbc.username());
        logRolesForCurrentUser(jdbc);
        logServerInformation(jdbc);
        try {
            this.logger.info("Step 0: disabling autocommit and enabling repeatable read transactions");
            jdbc.setAutoCommit(false);
            atomicReference.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
            jdbc.execute(new String[]{atomicReference.get()});
            String statementFor = this.context.setStatementFor(this.context.readMySqlCharsetSystemVariables(atomicReference));
            this.logger.info("Step 1: start transaction with consistent snapshot");
            atomicReference.set("START TRANSACTION WITH CONSISTENT SNAPSHOT");
            jdbc.execute(new String[]{atomicReference.get()});
            long currentTimeInMillis2 = clock.currentTimeInMillis();
            this.logger.info("Step 2: flush and obtain global read lock (preventing writes to database)");
            atomicReference.set("FLUSH TABLES WITH READ LOCK");
            jdbc.execute(new String[]{atomicReference.get()});
            this.logger.info("Step 3: read binlog position of MySQL master");
            String str = "SHOW MASTER STATUS";
            atomicReference.set("SHOW MASTER STATUS");
            jdbc.query(atomicReference.get(), resultSet -> {
                if (!resultSet.next()) {
                    throw new IllegalStateException("Cannot read the binlog filename and position via '" + str + "'. Make sure your server is correctly configured");
                }
                String string = resultSet.getString(1);
                long j = resultSet.getLong(2);
                source.setBinlogStartPoint(string, j);
                if (resultSet.getMetaData().getColumnCount() > 4) {
                    String string2 = resultSet.getString(5);
                    source.setGtidSet(string2);
                    this.logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", new Object[]{string, Long.valueOf(j), string2});
                } else {
                    this.logger.info("\t using binlog '{}' at position '{}'", string, Long.valueOf(j));
                }
                source.startSnapshot();
            });
            this.logger.info("Step 4: read list of available databases");
            ArrayList<String> arrayList = new ArrayList();
            atomicReference.set("SHOW DATABASES");
            jdbc.query(atomicReference.get(), resultSet2 -> {
                while (resultSet2.next()) {
                    arrayList.add(resultSet2.getString(1));
                }
            });
            this.logger.info("\t list of available databases is: {}", arrayList);
            this.logger.info("Step 5: read list of available tables in each database");
            ArrayList<TableId> arrayList2 = new ArrayList();
            HashMap hashMap = new HashMap();
            for (String str2 : arrayList) {
                atomicReference.set("SHOW TABLES IN " + str2);
                jdbc.query(atomicReference.get(), resultSet3 -> {
                    while (resultSet3.next()) {
                        TableId tableId = new TableId(str2, (String) null, resultSet3.getString(1));
                        if (filters.tableFilter().test(tableId)) {
                            arrayList2.add(tableId);
                            ((List) hashMap.computeIfAbsent(str2, str3 -> {
                                return new ArrayList();
                            })).add(tableId);
                            this.logger.info("\t including '{}'", tableId);
                        } else {
                            this.logger.info("\t '{}' is filtered out, discarding", tableId);
                        }
                    }
                });
            }
            this.logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas:");
            dbSchema.applyDdl(source, null, statementFor, this::enqueueSchemaChanges);
            HashSet hashSet = new HashSet(dbSchema.tables().tableIds());
            hashSet.addAll(arrayList2);
            hashSet.forEach(tableId -> {
                dbSchema.applyDdl(source, tableId.schema(), "DROP TABLE IF EXISTS " + tableId, this::enqueueSchemaChanges);
            });
            Stream map = dbSchema.tables().tableIds().stream().map((v0) -> {
                return v0.catalog();
            });
            arrayList.getClass();
            map.filter(Predicates.not((v1) -> {
                return r1.contains(v1);
            })).forEach(str3 -> {
                dbSchema.applyDdl(source, str3, "DROP DATABASE IF EXISTS " + str3, this::enqueueSchemaChanges);
            });
            for (Map.Entry entry : hashMap.entrySet()) {
                String str4 = (String) entry.getKey();
                dbSchema.applyDdl(source, str4, "DROP DATABASE IF EXISTS " + str4, this::enqueueSchemaChanges);
                dbSchema.applyDdl(source, str4, "CREATE DATABASE " + str4, this::enqueueSchemaChanges);
                dbSchema.applyDdl(source, str4, "USE " + str4, this::enqueueSchemaChanges);
                Iterator it = ((List) entry.getValue()).iterator();
                while (it.hasNext()) {
                    atomicReference.set("SHOW CREATE TABLE " + ((TableId) it.next()));
                    jdbc.query(atomicReference.get(), resultSet4 -> {
                        if (resultSet4.next()) {
                            dbSchema.applyDdl(source, str4, resultSet4.getString(2), this::enqueueSchemaChanges);
                        }
                    });
                }
            }
            this.context.makeRecord().regenerate();
            boolean z = false;
            if (this.minimalBlocking) {
                this.logger.info("Step 7: releasing global read lock to enable MySQL writes");
                atomicReference.set("UNLOCK TABLES");
                jdbc.execute(new String[]{atomicReference.get()});
                z = true;
                this.logger.info("Step 7: blocked writes to MySQL for a total of {}", Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis2));
            }
            BlockingConsumer<SourceRecord> bufferLast = BufferedBlockingConsumer.bufferLast(sourceRecord -> {
                super.enqueueRecord(sourceRecord);
            });
            this.logger.info("Step 8: scanning contents of {} tables", Integer.valueOf(arrayList2.size()));
            long currentTimeInMillis3 = clock.currentTimeInMillis();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicLong atomicLong = new AtomicLong();
            int i = 0;
            int i2 = 0;
            long rowCountForLargeTable = this.context.rowCountForLargeTable();
            for (TableId tableId2 : arrayList2) {
                RecordMakers.RecordsForTable forTable = this.context.makeRecord().forTable(tableId2, (BitSet) null, bufferLast);
                if (forTable != null) {
                    atomicReference.set("SELECT COUNT(*) FROM " + tableId2);
                    AtomicLong atomicLong2 = new AtomicLong();
                    jdbc.query(atomicReference.get(), resultSet5 -> {
                        if (resultSet5.next()) {
                            atomicLong2.set(resultSet5.getLong(1));
                        }
                    });
                    JdbcConnection.StatementFactory statementFactory = this::createStatement;
                    if (atomicLong2.get() > rowCountForLargeTable) {
                        statementFactory = this::createStatementWithLargeResultSet;
                    }
                    long currentTimeInMillis4 = clock.currentTimeInMillis();
                    i++;
                    this.logger.info("Step 8: - scanning table '{}' ({} of {} tables)", new Object[]{tableId2, Integer.valueOf(i), Integer.valueOf(arrayList2.size())});
                    atomicReference.set("SELECT * FROM " + tableId2);
                    jdbc.query(atomicReference.get(), statementFactory, resultSet6 -> {
                        long j = 0;
                        long j2 = atomicLong2.get();
                        try {
                            try {
                                int size = dbSchema.tableFor(tableId2).columns().size();
                                Object[] objArr = new Object[size];
                                while (resultSet6.next()) {
                                    int i3 = 0;
                                    int i4 = 1;
                                    while (i3 != size) {
                                        objArr[i3] = resultSet6.getObject(i4);
                                        i3++;
                                        i4++;
                                    }
                                    this.recorder.recordRow(forTable, objArr, currentTimeInMillis);
                                    j++;
                                    if (j % 10000 == 0 || j == j2) {
                                        this.logger.info("Step 8: - {} of {} rows scanned from table '{}' after {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), tableId2, Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis4)});
                                    }
                                }
                                atomicLong.addAndGet(j2);
                            } catch (InterruptedException e) {
                                Thread.interrupted();
                                this.logger.info("Step 8: Stopping the snapshot due to thread interruption");
                                atomicBoolean.set(true);
                                atomicLong.addAndGet(j2);
                            }
                        } catch (Throwable th) {
                            atomicLong.addAndGet(j2);
                            throw th;
                        }
                    });
                    if (atomicBoolean.get()) {
                        break;
                    }
                }
                i2++;
            }
            source.markLastSnapshot();
            long currentTimeInMillis5 = clock.currentTimeInMillis();
            try {
                bufferLast.flush(this::replaceOffset);
                this.logger.info("Step 8: scanned {} rows in {} tables in {}", new Object[]{atomicLong, Integer.valueOf(arrayList2.size()), Strings.duration(currentTimeInMillis5 - currentTimeInMillis3)});
            } catch (InterruptedException e) {
                Thread.interrupted();
                this.logger.info("Step 8: aborting the snapshot after {} rows in {} of {} tables {}", new Object[]{atomicLong, Integer.valueOf(i2), Integer.valueOf(arrayList2.size()), Strings.duration(currentTimeInMillis5 - currentTimeInMillis3)});
                atomicBoolean.set(true);
            }
            int i3 = 9;
            if (!z) {
                i3 = 9 + 1;
                this.logger.info("Step {}: releasing global read lock to enable MySQL writes", 9);
                atomicReference.set("UNLOCK TABLES");
                jdbc.execute(new String[]{atomicReference.get()});
                this.logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis2));
            }
            if (atomicBoolean.get()) {
                int i4 = i3;
                int i5 = i3 + 1;
                this.logger.info("Step {}: rolling back transaction after abort", Integer.valueOf(i4));
                atomicReference.set("ROLLBACK");
                jdbc.execute(new String[]{atomicReference.get()});
                return;
            }
            int i6 = i3;
            int i7 = i3 + 1;
            this.logger.info("Step {}: committing transaction", Integer.valueOf(i6));
            atomicReference.set("COMMIT");
            jdbc.execute(new String[]{atomicReference.get()});
            try {
                source.completeSnapshot();
                super.completeSuccessfully();
                this.logger.info("Completed snapshot in {}", Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis));
            } catch (Throwable th) {
                super.completeSuccessfully();
                this.logger.info("Completed snapshot in {}", Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis));
                throw th;
            }
        } catch (Throwable th2) {
            failed(th2, "Aborting snapshot after running '" + atomicReference.get() + "': " + th2.getMessage());
        }
    }

    private Statement createStatementWithLargeResultSet(Connection connection) throws SQLException {
        Statement createStatement = connection.createStatement(1003, 1007);
        createStatement.setFetchSize(Integer.MIN_VALUE);
        return createStatement;
    }

    private Statement createStatement(Connection connection) throws SQLException {
        return connection.createStatement();
    }

    private void logServerInformation(JdbcConnection jdbcConnection) {
        try {
            this.logger.info("MySQL server variables related to change data capture:");
            jdbcConnection.query("SHOW VARIABLES WHERE Variable_name REGEXP 'version|binlog|tx_|gtid|character_set|collation'", resultSet -> {
                while (resultSet.next()) {
                    this.logger.info("\t{} = {}", Strings.pad(resultSet.getString(1), 45, ' '), Strings.pad(resultSet.getString(2), 45, ' '));
                }
            });
        } catch (SQLException e) {
            this.logger.info("Cannot determine MySql server version", e);
        }
    }

    private void logRolesForCurrentUser(JdbcConnection jdbcConnection) {
        try {
            ArrayList arrayList = new ArrayList();
            jdbcConnection.query("SHOW GRANTS FOR CURRENT_USER", resultSet -> {
                while (resultSet.next()) {
                    arrayList.add(resultSet.getString(1));
                }
            });
            if (arrayList.isEmpty()) {
                this.logger.warn("Snapshot is using user '{}' but it likely doesn't have proper privileges. If tables are missing or are empty, ensure connector is configured with the correct MySQL user and/or ensure that the MySQL user has the required privileges.", jdbcConnection.username());
            } else {
                this.logger.info("Snapshot is using user '{}' with these MySQL grants:", jdbcConnection.username());
                arrayList.forEach(str -> {
                    this.logger.info("\t{}", str);
                });
            }
        } catch (SQLException e) {
            this.logger.info("Cannot determine the privileges for '{}' ", jdbcConnection.username(), e);
        }
    }

    protected SourceRecord replaceOffset(SourceRecord sourceRecord) {
        if (sourceRecord == null) {
            return null;
        }
        return new SourceRecord(sourceRecord.sourcePartition(), this.context.source().offset(), sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), sourceRecord.valueSchema(), sourceRecord.value());
    }

    protected void enqueueSchemaChanges(String str, String str2) {
        if (!this.context.includeSchemaChangeRecords() || str2.length() == 0 || this.context.makeRecord().schemaChanges(str, str2, sourceRecord -> {
            super.enqueueRecord(sourceRecord);
        }) <= 0) {
            return;
        }
        this.logger.info("\t{}", str2);
    }

    protected void recordRowAsRead(RecordMakers.RecordsForTable recordsForTable, Object[] objArr, long j) throws InterruptedException {
        recordsForTable.read(objArr, j);
    }

    protected void recordRowAsInsert(RecordMakers.RecordsForTable recordsForTable, Object[] objArr, long j) throws InterruptedException {
        recordsForTable.create(objArr, j);
    }
}
