package io.debezium.connector.mysql;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.XidEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.network.ServerException;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.Testing;
import java.io.IOException;
import java.io.Serializable;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mysql/ReadBinLogIT.class */
public class ReadBinLogIT implements Testing {
    protected static final Logger LOGGER = LoggerFactory.getLogger(ReadBinLogIT.class);
    protected static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
    private static final Serializable ANY_OBJECT = new AnyValue();
    private EventQueue counters;
    private BinaryLogClient client;
    private MySQLConnection conn;
    private List<Event> events = new LinkedList();
    private JdbcConfiguration config;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/connector/mysql/ReadBinLogIT$AnyValue.class */
    public static final class AnyValue implements Serializable {
        private static final long serialVersionUID = 1;

        private AnyValue() {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/connector/mysql/ReadBinLogIT$EventQueue.class */
    public static class EventQueue implements BinaryLogClient.EventListener {
        private final ConcurrentLinkedQueue<Event> queue = new ConcurrentLinkedQueue<>();
        private final Consumer<Event> consumedEvents;
        private final Consumer<Event> ignoredEvents;
        private final long defaultTimeoutInMillis;
        static final /* synthetic */ boolean $assertionsDisabled;

        public EventQueue(long j, Consumer<Event> consumer, Consumer<Event> consumer2) {
            this.defaultTimeoutInMillis = j;
            this.consumedEvents = consumer != null ? consumer : this::defaultEventHandler;
            this.ignoredEvents = consumer2 != null ? consumer2 : this::defaultEventHandler;
        }

        private void defaultEventHandler(Event event) {
        }

        public void onEvent(Event event) {
            boolean offer = this.queue.offer(event);
            if (!$assertionsDisabled && !offer) {
                throw new AssertionError();
            }
        }

        public void consumeAll(long j, TimeUnit timeUnit) throws TimeoutException {
            long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(j);
            while (System.currentTimeMillis() < currentTimeMillis) {
                Event poll = this.queue.poll();
                if (poll != null) {
                    Testing.print("Found event: " + poll);
                    this.consumedEvents.accept(poll);
                }
            }
        }

        public void consume(int i, Predicate<Event> predicate) throws TimeoutException {
            consume(i, this.defaultTimeoutInMillis, predicate);
        }

        public void consume(int i, long j, Predicate<Event> predicate) throws TimeoutException {
            if (i < 0) {
                throw new IllegalArgumentException("The eventCount may not be negative");
            }
            if (i == 0) {
                return;
            }
            int i2 = i;
            long currentTimeMillis = System.currentTimeMillis() + j;
            while (i2 > 0 && System.currentTimeMillis() < currentTimeMillis) {
                Event poll = this.queue.poll();
                if (poll != null) {
                    if (predicate.test(poll)) {
                        i2--;
                        this.consumedEvents.accept(poll);
                    } else {
                        this.ignoredEvents.accept(poll);
                    }
                }
            }
            if (i2 > 0) {
                throw new TimeoutException("Received " + (i - i2) + " of " + i + " in " + j + "ms");
            }
        }

        public void consume(int i, EventType eventType) throws TimeoutException {
            consume(i, eventType, this.defaultTimeoutInMillis);
        }

        public void consume(int i, EventType eventType, long j) throws TimeoutException {
            consume(i, this.defaultTimeoutInMillis, event -> {
                EventHeader header = event.getHeader();
                return eventType.equals(header == null ? null : header.getEventType());
            });
        }

        public void consume(int i, Class<? extends EventData> cls) throws TimeoutException {
            consume(i, cls, this.defaultTimeoutInMillis);
        }

        public void consume(int i, Class<? extends EventData> cls, long j) throws TimeoutException {
            consume(i, this.defaultTimeoutInMillis, event -> {
                EventData data = event.getData();
                return data != null && data.getClass().equals(cls);
            });
        }

        public void reset() {
            this.queue.clear();
        }

        static {
            $assertionsDisabled = !ReadBinLogIT.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:io/debezium/connector/mysql/ReadBinLogIT$Row.class */
    public static class Row {
        public Serializable[] fromValues;
        public Serializable[] toValues;
    }

    /* loaded from: input_file:io/debezium/connector/mysql/ReadBinLogIT$RowBuilder.class */
    public static class RowBuilder {
        private List<Row> rows = new ArrayList();
        private Row nextRow = null;

        public RowBuilder insertedRow(Serializable... serializableArr) {
            maybeAddRow();
            return changeRow(new Serializable[0]).to(serializableArr);
        }

        public RowBuilder removedRow(Serializable... serializableArr) {
            maybeAddRow();
            return changeRow(serializableArr).to(serializableArr);
        }

        public UpdateBuilder changeRow(Serializable... serializableArr) {
            maybeAddRow();
            this.nextRow = new Row();
            this.nextRow.fromValues = serializableArr;
            return new UpdateBuilder() { // from class: io.debezium.connector.mysql.ReadBinLogIT.RowBuilder.1
                @Override // io.debezium.connector.mysql.ReadBinLogIT.UpdateBuilder
                public RowBuilder to(Serializable... serializableArr2) {
                    RowBuilder.this.nextRow.toValues = serializableArr2;
                    return RowBuilder.this;
                }
            };
        }

        protected void maybeAddRow() {
            if (this.nextRow != null) {
                this.rows.add(this.nextRow);
                this.nextRow = null;
            }
        }

        protected List<Row> rows() {
            maybeAddRow();
            return this.rows;
        }

        protected boolean findInsertedRow(Serializable[] serializableArr) {
            maybeAddRow();
            Iterator<Row> it = this.rows.iterator();
            while (it.hasNext()) {
                if (deepEquals(it.next().toValues, serializableArr)) {
                    it.remove();
                    return true;
                }
            }
            return false;
        }

        protected boolean findDeletedRow(Serializable[] serializableArr) {
            maybeAddRow();
            Iterator<Row> it = this.rows.iterator();
            while (it.hasNext()) {
                if (deepEquals(it.next().fromValues, serializableArr)) {
                    it.remove();
                    return true;
                }
            }
            return false;
        }

        protected boolean findUpdatedRow(Serializable[] serializableArr, Serializable[] serializableArr2) {
            maybeAddRow();
            Iterator<Row> it = this.rows.iterator();
            while (it.hasNext()) {
                Row next = it.next();
                if (deepEquals(next.fromValues, serializableArr) && deepEquals(next.toValues, serializableArr2)) {
                    it.remove();
                    return true;
                }
            }
            return false;
        }

        protected boolean deepEquals(Serializable[] serializableArr, Serializable[] serializableArr2) {
            Assertions.assertThat(serializableArr.length).isEqualTo(serializableArr2.length);
            Serializable[] serializableArr3 = (Serializable[]) Arrays.copyOf(serializableArr2, serializableArr2.length);
            for (int i = 0; i != serializableArr3.length; i++) {
                if (serializableArr[i] instanceof AnyValue) {
                    serializableArr3[i] = serializableArr[i];
                }
            }
            return Arrays.deepEquals(serializableArr, serializableArr3);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/connector/mysql/ReadBinLogIT$TraceLifecycleListener.class */
    public static class TraceLifecycleListener implements BinaryLogClient.LifecycleListener {
        protected TraceLifecycleListener() {
        }

        public void onDisconnect(BinaryLogClient binaryLogClient) {
            ReadBinLogIT.LOGGER.debug("Client disconnected");
        }

        public void onConnect(BinaryLogClient binaryLogClient) {
            ReadBinLogIT.LOGGER.debug("Client connected");
        }

        public void onCommunicationFailure(BinaryLogClient binaryLogClient, Exception exc) {
            ReadBinLogIT.LOGGER.warn("Client communication failure", exc);
        }

        public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Exception exc) {
            ReadBinLogIT.LOGGER.error("Client received event deserialization failure", exc);
        }
    }

    /* loaded from: input_file:io/debezium/connector/mysql/ReadBinLogIT$UpdateBuilder.class */
    public interface UpdateBuilder {
        RowBuilder to(Serializable... serializableArr);
    }

    @Before
    public void beforeEach() throws TimeoutException, IOException, SQLException, InterruptedException {
        this.events.clear();
        this.conn = MySQLConnection.forTestDatabase("readbinlog_test");
        this.conn.connect();
        this.config = this.conn.config();
    }

    @After
    public void afterEach() throws IOException, SQLException {
        this.events.clear();
        try {
            if (this.client != null) {
                this.client.disconnect();
            }
            this.client = null;
            try {
                if (this.conn != null) {
                    this.conn.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            this.client = null;
            try {
                if (this.conn != null) {
                    this.conn.close();
                }
                throw th;
            } finally {
            }
        }
    }

    protected void startClient() throws IOException, TimeoutException, SQLException {
        startClient(null);
    }

    protected void startClient(Consumer<BinaryLogClient> consumer) throws IOException, TimeoutException, SQLException {
        this.counters = new EventQueue(DEFAULT_TIMEOUT, this::logConsumedEvent, this::logIgnoredEvent);
        this.client = new BinaryLogClient(this.config.getHostname(), this.config.getPort(), "replicator", "replpass");
        this.client.setServerId(this.client.getServerId() - 1);
        this.client.setKeepAlive(false);
        this.client.registerEventListener(this.counters);
        this.client.registerEventListener(this::recordEvent);
        this.client.registerLifecycleListener(new TraceLifecycleListener());
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
        this.client.setEventDeserializer(eventDeserializer);
        if (consumer != null) {
            consumer.accept(this.client);
        }
        this.client.connect(DEFAULT_TIMEOUT);
        this.conn.execute(new String[]{"DROP TABLE IF EXISTS person", "CREATE TABLE person (  name VARCHAR(255) primary key,  age INTEGER NULL DEFAULT 10,  createdAt DATETIME NULL DEFAULT CURRENT_TIMESTAMP,  updatedAt DATETIME NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP)"});
        this.counters.consume(2, EventType.QUERY);
        this.counters.reset();
    }

    @Test(expected = ServerException.class)
    @Ignore
    public void shouldFailToConnectToInvalidBinlogFile() throws Exception {
        Testing.Print.enable();
        startClient(binaryLogClient -> {
            binaryLogClient.setBinlogFilename("invalid-mysql-binlog.filename.000001");
        });
    }

    @Test
    @Ignore
    public void shouldReadMultipleBinlogFiles() throws Exception {
        Testing.Print.enable();
        startClient(binaryLogClient -> {
            binaryLogClient.setBinlogFilename("mysql-bin.000001");
        });
        this.counters.consumeAll(20L, TimeUnit.SECONDS);
    }

    @Test
    public void shouldCaptureSingleWriteUpdateDeleteEvents() throws Exception {
        startClient();
        this.conn.execute(new String[]{"INSERT INTO person(name,age) VALUES ('Georgia',30)"});
        this.counters.consume(1, WriteRowsEventData.class);
        assertRows((WriteRowsEventData) recordedEventData(WriteRowsEventData.class, 1).get(0), rows().insertedRow("Georgia", 30, any(), any()));
        this.conn.execute(new String[]{"UPDATE person SET name = 'Maggie' WHERE name = 'Georgia'"});
        this.counters.consume(1, UpdateRowsEventData.class);
        assertRows((UpdateRowsEventData) recordedEventData(UpdateRowsEventData.class, 1).get(0), rows().changeRow("Georgia", 30, any(), any()).to("Maggie", 30, any(), any()));
        this.conn.execute(new String[]{"DELETE FROM person WHERE name = 'Maggie'"});
        this.counters.consume(1, DeleteRowsEventData.class);
        assertRows((DeleteRowsEventData) recordedEventData(DeleteRowsEventData.class, 1).get(0), rows().removedRow("Maggie", 30, any(), any()));
    }

    @Test
    public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception {
        startClient();
        this.conn.execute(new String[]{"INSERT INTO person(name,age) VALUES ('Georgia',30)", "INSERT INTO person(name,age) VALUES ('Janice',19)"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(2, WriteRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        List recordedEventData = recordedEventData(WriteRowsEventData.class, 2);
        assertRows((WriteRowsEventData) recordedEventData.get(0), rows().insertedRow("Georgia", 30, any(), any()));
        assertRows((WriteRowsEventData) recordedEventData.get(1), rows().insertedRow("Janice", 19, any(), any()));
        this.counters.reset();
        this.conn.execute(new String[]{"UPDATE person SET name = 'Maggie' WHERE name = 'Georgia'", "UPDATE person SET name = 'Jamie' WHERE name = 'Janice'"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(2, UpdateRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        List recordedEventData2 = recordedEventData(UpdateRowsEventData.class, 2);
        assertRows((UpdateRowsEventData) recordedEventData2.get(0), rows().changeRow("Georgia", 30, any(), any()).to("Maggie", 30, any(), any()));
        assertRows((UpdateRowsEventData) recordedEventData2.get(1), rows().changeRow("Janice", 19, any(), any()).to("Jamie", 19, any(), any()));
        this.counters.reset();
        this.conn.execute(new String[]{"DELETE FROM person WHERE name = 'Maggie'", "DELETE FROM person WHERE name = 'Jamie'"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(2, DeleteRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        List recordedEventData3 = recordedEventData(DeleteRowsEventData.class, 2);
        assertRows((DeleteRowsEventData) recordedEventData3.get(0), rows().removedRow("Maggie", 30, any(), any()));
        assertRows((DeleteRowsEventData) recordedEventData3.get(1), rows().removedRow("Jamie", 19, any(), any()));
    }

    @Test
    public void shouldCaptureMultipleWriteUpdateDeletesInSingleEvents() throws Exception {
        startClient();
        this.conn.execute(new String[]{"INSERT INTO person(name,age) VALUES ('Georgia',30),('Janice',19)"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(1, WriteRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        assertRows((WriteRowsEventData) recordedEventData(WriteRowsEventData.class, 1).get(0), rows().insertedRow("Georgia", 30, any(), any()).insertedRow("Janice", 19, any(), any()));
        this.counters.reset();
        this.conn.execute(new String[]{"UPDATE person SET name = CASE                           WHEN name = 'Georgia' THEN 'Maggie'                           WHEN name = 'Janice' THEN 'Jamie'                          END WHERE name IN ('Georgia','Janice')"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(1, UpdateRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        assertRows((UpdateRowsEventData) recordedEventData(UpdateRowsEventData.class, 1).get(0), rows().changeRow("Georgia", 30, any(), any()).to("Maggie", 30, any(), any()).changeRow("Janice", 19, any(), any()).to("Jamie", 19, any(), any()));
        this.counters.reset();
        this.conn.execute(new String[]{"DELETE FROM person WHERE name IN ('Maggie','Jamie')"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(1, DeleteRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        assertRows((DeleteRowsEventData) recordedEventData(DeleteRowsEventData.class, 1).get(0), rows().removedRow("Maggie", 30, any(), any()).removedRow("Jamie", 19, any(), any()));
    }

    @Test
    @Ignore
    public void shouldCaptureQueryEventData() throws Exception {
        startClient(binaryLogClient -> {
            binaryLogClient.setBinlogFilename("mysql-bin.000001");
            binaryLogClient.setBinlogPosition(4L);
        });
        this.counters.consumeAll(5L, TimeUnit.SECONDS);
        recordedEventData(QueryEventData.class, -1).forEach(queryEventData -> {
            String sql = queryEventData.getSql();
            if (sql.equalsIgnoreCase("BEGIN") || sql.equalsIgnoreCase("COMMIT")) {
                return;
            }
            System.out.println(queryEventData.getSql());
        });
    }

    @Test
    public void shouldQueryInformationSchema() throws Exception {
    }

    protected void logConsumedEvent(Event event) {
        Testing.print("Consumed event: " + event);
    }

    protected void logIgnoredEvent(Event event) {
        Testing.print("Ignored event:  " + event);
    }

    protected void recordEvent(Event event) {
        synchronized (this.events) {
            this.events.add(event);
        }
    }

    protected <T extends EventData> List<T> recordedEventData(Class<T> cls, int i) {
        List<T> list;
        synchronized (this.events) {
            Stream<R> map = this.events.stream().map((v0) -> {
                return v0.getData();
            });
            cls.getClass();
            Stream filter = map.filter(cls::isInstance);
            cls.getClass();
            list = (List) filter.map(cls::cast).collect(Collectors.toList());
        }
        if (i > -1) {
            Assertions.assertThat(list.size()).isEqualTo(i);
        }
        return list;
    }

    protected void assertRow(Serializable[] serializableArr, Serializable... serializableArr2) {
        Assertions.assertThat(serializableArr.length).isEqualTo(serializableArr2.length);
        Assertions.assertThat(serializableArr).contains(serializableArr2);
    }

    protected void assertRows(WriteRowsEventData writeRowsEventData, int i, Serializable... serializableArr) {
        Assertions.assertThat(writeRowsEventData.getRows().size()).isEqualTo(i);
        int i2 = 0;
        for (Serializable[] serializableArr2 : writeRowsEventData.getRows()) {
            for (Serializable serializable : serializableArr2) {
                int i3 = i2;
                i2++;
                Assertions.assertThat(serializable).isEqualTo(serializableArr[i3]);
            }
        }
    }

    protected Serializable any() {
        return ANY_OBJECT;
    }

    protected RowBuilder rows() {
        return new RowBuilder();
    }

    protected void assertRows(UpdateRowsEventData updateRowsEventData, RowBuilder rowBuilder) {
        Assertions.assertThat(updateRowsEventData.getRows().size()).isEqualTo(rowBuilder.rows().size());
        for (Map.Entry entry : updateRowsEventData.getRows()) {
            if (!rowBuilder.findUpdatedRow((Serializable[]) entry.getKey(), (Serializable[]) entry.getValue())) {
                Assert.fail("Failed to find updated row: " + updateRowsEventData);
            }
        }
    }

    protected void assertRows(WriteRowsEventData writeRowsEventData, RowBuilder rowBuilder) {
        Assertions.assertThat(writeRowsEventData.getRows().size()).isEqualTo(rowBuilder.rows().size());
        Iterator it = writeRowsEventData.getRows().iterator();
        while (it.hasNext()) {
            if (!rowBuilder.findInsertedRow((Serializable[]) it.next())) {
                Assert.fail("Failed to find inserted row: " + writeRowsEventData);
            }
        }
    }

    protected void assertRows(DeleteRowsEventData deleteRowsEventData, RowBuilder rowBuilder) {
        Assertions.assertThat(deleteRowsEventData.getRows().size()).isEqualTo(rowBuilder.rows().size());
        Iterator it = deleteRowsEventData.getRows().iterator();
        while (it.hasNext()) {
            if (!rowBuilder.findDeletedRow((Serializable[]) it.next())) {
                Assert.fail("Failed to find removed row: " + deleteRowsEventData);
            }
        }
    }
}
