package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.sql.Connection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlMetricsIT.class */
public class MySqlMetricsIT extends AbstractAsyncEngineConnectorTest {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-metrics.txt").toAbsolutePath();
    private static final String SERVER_NAME = "myserver";
    private final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "connector_metrics_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    private static final String INSERT1 = "INSERT INTO simple (val) VALUES (25);";
    private static final String INSERT2 = "INSERT INTO simple (val) VALUES (50);";

    @Before
    public void before() throws Exception {
        stopConnector();
        this.DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() throws Exception {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    public void testLifecycle() throws Exception {
        start(MySqlConnector.class, this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("simple")).with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        waitForStreamingToStart();
        stopConnector();
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        try {
            platformMBeanServer.getMBeanInfo(getSnapshotMetricsObjectName());
            Assert.fail("Expected Snapshot Metrics no longer to exist");
        } catch (InstanceNotFoundException e) {
        }
        try {
            platformMBeanServer.getMBeanInfo(getStreamingMetricsObjectName());
            Assert.fail("Expected Streaming Metrics no longer to exist");
        } catch (InstanceNotFoundException e2) {
        }
    }

    @Test
    public void testSnapshotOnlyMetrics() throws Exception {
        Connection connection = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName()).connection();
        try {
            connection.createStatement().execute(INSERT1);
            connection.createStatement().execute(INSERT2);
            if (connection != null) {
                connection.close();
            }
            start(MySqlConnector.class, this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL_ONLY).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("simple")).with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE).build());
            assertSnapshotMetrics();
            assertStreamingMetricsExist();
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testPauseResumeSnapshotMetrics() throws Exception {
        String qualifiedTableName = this.DATABASE.qualifiedTableName("simple");
        String qualifiedTableName2 = this.DATABASE.qualifiedTableName("debezium_signal");
        Connection connection = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName()).connection();
        for (int i = 1; i < 1000; i++) {
            try {
                connection.createStatement().execute(String.format("INSERT INTO %s (val) VALUES (%d);", qualifiedTableName, Integer.valueOf(i)));
            } finally {
            }
        }
        if (connection != null) {
            connection.close();
        }
        start(MySqlConnector.class, this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s", qualifiedTableName)).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE).with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5).with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, qualifiedTableName2).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        waitForStreamingToStart();
        ArrayList arrayList = new ArrayList();
        consumeRecords(1000, sourceRecord -> {
            arrayList.add(sourceRecord);
        });
        connection = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName()).connection();
        try {
            connection.createStatement().execute(String.format("INSERT INTO debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')", qualifiedTableName));
            connection.createStatement().execute(String.format("INSERT INTO %s VALUES('test-pause', 'pause-snapshot', '')", qualifiedTableName2));
            Thread.sleep(1500L);
            connection.createStatement().execute(String.format("INSERT INTO debezium_signal VALUES('test-resume', 'resume-snapshot', '')", qualifiedTableName2));
            if (connection != null) {
                connection.close();
            }
            consumeRecords(1000, sourceRecord2 -> {
                arrayList.add(sourceRecord2);
            });
            Assert.assertTrue(arrayList.size() >= 2000);
            assertSnapshotPauseNotZero();
        } finally {
        }
    }

    @Test
    public void testSnapshotAndStreamingMetrics() throws Exception {
        Connection connection = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName()).connection();
        try {
            connection.createStatement().execute(INSERT1);
            connection.createStatement().execute(INSERT2);
            if (connection != null) {
                connection.close();
            }
            start(MySqlConnector.class, this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("simple")).with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE).build());
            assertSnapshotMetrics();
            assertStreamingMetrics(0L);
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStreamingOnlyMetrics() throws Exception {
        start(MySqlConnector.class, this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER).with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class).with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("simple")).with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE).build());
        assertStreamingMetrics(4L);
        assertSnapshotMetricsExist();
    }

    private void assertNoSnapshotMetricsExist() throws Exception {
        try {
            ManagementFactory.getPlatformMBeanServer().getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted");
            Assert.fail("Expected Snapshot Metrics to not exist");
        } catch (InstanceNotFoundException e) {
        }
    }

    private void assertNoStreamingMetricsExist() throws Exception {
        try {
            ManagementFactory.getPlatformMBeanServer().getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen");
            Assert.fail("Expected Streaming Metrics to not exist");
        } catch (InstanceNotFoundException e) {
        }
    }

    private void assertStreamingMetricsExist() throws Exception {
        try {
            ManagementFactory.getPlatformMBeanServer().getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen");
        } catch (InstanceNotFoundException e) {
            Assert.fail("Streaming Metrics should exist");
        }
    }

    private void assertSnapshotMetricsExist() throws Exception {
        try {
            ManagementFactory.getPlatformMBeanServer().getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted");
        } catch (InstanceNotFoundException e) {
            Assert.fail("Snapshot Metrics should exist");
        }
    }

    private void assertSnapshotPauseNotZero() throws Exception {
        try {
            Assert.assertTrue(((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).longValue() > 0);
        } catch (InstanceNotFoundException e) {
            Assert.fail("Snapshot Metrics should exist");
        }
    }

    private void assertSnapshotMetrics() throws Exception {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        waitForSnapshotToBeCompleted();
        consumeRecords(8);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(1);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "CapturedTables")).isEqualTo(new String[]{this.DATABASE.qualifiedTableName("simple")});
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(2L);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "NumberOfEventsFiltered")).isEqualTo(0L);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "NumberOfErroneousEvents")).isEqualTo(0L);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false);
        Assertions.assertThat(platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
    }

    private void assertStreamingMetrics(long j) throws Exception {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        waitForStreamingToStart();
        Connection connection = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName()).connection();
        try {
            connection.createStatement().execute(INSERT1);
            connection.createStatement().execute(INSERT2);
            if (connection != null) {
                connection.close();
            }
            waitForAvailableRecords(30L, TimeUnit.SECONDS);
            consumeAvailableRecords(VerifyRecord::print);
            Assertions.assertThat(platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "Connected")).isEqualTo(true);
            Assertions.assertThat((Long) platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "TotalNumberOfEventsSeen")).isGreaterThanOrEqualTo(j);
            Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
                return Boolean.valueOf(((String[]) platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).length > 0);
            });
            Assertions.assertThat(platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "CapturedTables")).isEqualTo(new String[]{this.DATABASE.qualifiedTableName("simple")});
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException {
        return getSnapshotMetricsObjectName("mysql", SERVER_NAME);
    }

    private ObjectName getStreamingMetricsObjectName() throws MalformedObjectNameException {
        return getStreamingMetricsObjectName("mysql", SERVER_NAME, getStreamingNamespace());
    }

    private void waitForSnapshotToBeCompleted() throws InterruptedException {
        waitForSnapshotToBeCompleted("mysql", SERVER_NAME);
    }

    private void waitForStreamingToStart() throws InterruptedException {
        waitForStreamingRunning("mysql", SERVER_NAME, getStreamingNamespace());
    }
}
