package io.debezium.pipeline.notification;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.pipeline.notification.channels.jmx.JmxNotificationChannelMXBean;
import java.lang.management.ManagementFactory;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.IntrospectionException;
import javax.management.JMX;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.Notification;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/notification/AbstractNotificationsIT.class */
public abstract class AbstractNotificationsIT<T extends SourceConnector> extends AbstractAsyncEngineConnectorTest {

    /* loaded from: input_file:io/debezium/pipeline/notification/AbstractNotificationsIT$ClientListener.class */
    public static class ClientListener implements NotificationListener {
        public void handleNotification(Notification notification, Object obj) {
            ((List) obj).add(notification);
        }
    }

    protected abstract Class<T> connectorClass();

    protected abstract Configuration.Builder config();

    protected abstract String connector();

    protected abstract String server();

    protected String task() {
        return null;
    }

    protected String database() {
        return null;
    }

    protected List<String> collections() {
        return Collections.emptyList();
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> function) {
        start(connectorClass(), function.apply(config()).build());
    }

    protected abstract String snapshotStatusResult();

    @Test
    public void notificationCorrectlySentOnItsTopic() throws InterruptedException {
        startConnector(builder -> {
            return builder.with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification").with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink");
        });
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            consumeAvailableRecords(sourceRecord -> {
                if (sourceRecord.topic().equals("io.debezium.notification")) {
                    arrayList.add(sourceRecord);
                }
            });
            return Boolean.valueOf(arrayList.size() == calculateNotificationSize());
        });
        Assertions.assertThat(arrayList).hasSize(calculateNotificationSize());
        SourceRecord sourceRecord = (SourceRecord) arrayList.get(0);
        Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification");
        Assertions.assertThat(((Struct) sourceRecord.value()).getString("aggregate_type")).isEqualTo("Initial Snapshot");
        Assertions.assertThat(((Struct) sourceRecord.value()).getString("type")).isEqualTo("STARTED");
        Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
        collections().forEach(str -> {
            assertTableNotificationsSentToTopic(arrayList, str);
        });
        SourceRecord sourceRecord2 = (SourceRecord) arrayList.get(arrayList.size() - 1);
        Assertions.assertThat(sourceRecord2.topic()).isEqualTo("io.debezium.notification");
        Assertions.assertThat(((Struct) sourceRecord2.value()).getString("aggregate_type")).isEqualTo("Initial Snapshot");
        Assertions.assertThat(((Struct) sourceRecord2.value()).getString("type")).isEqualTo(snapshotStatusResult());
        Assertions.assertThat(((Struct) sourceRecord2.value()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
    }

    @Test
    public void notificationNotSentIfNoChannelIsConfigured() {
        startConnector(builder -> {
            return builder.with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification");
        });
        assertConnectorIsRunning();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
    }

    @Test
    public void reportErrorWhenSinkChannelIsEnabledAndNoTopicConfigurationProvided() {
        LogInterceptor logInterceptor = new LogInterceptor("io.debezium.connector");
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink");
        });
        Assertions.assertThat(logInterceptor.containsErrorMessage("Connector configuration is not valid. The 'notification.sink.topic.name' value is invalid: Notification topic name must be provided when kafka notification channel is enabled")).isTrue();
    }

    @Test
    public void notificationCorrectlySentOnJmx() throws ReflectionException, MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException, InterruptedException {
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "jmx");
        });
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).pollDelay(1L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!readNotificationFromJmx().isEmpty());
        });
        List<Notification> readNotificationFromJmx = readNotificationFromJmx();
        Assertions.assertThat(readNotificationFromJmx).hasSize(calculateNotificationSize());
        Assertions.assertThat(readNotificationFromJmx.get(0)).hasFieldOrPropertyWithValue("aggregateType", "Initial Snapshot").hasFieldOrPropertyWithValue("type", "STARTED").hasFieldOrProperty("timestamp");
        collections().forEach(str -> {
            assertTableNotificationsSentToJmx(readNotificationFromJmx, str);
        });
        Assertions.assertThat(readNotificationFromJmx.get(readNotificationFromJmx.size() - 1)).hasFieldOrPropertyWithValue("aggregateType", "Initial Snapshot").hasFieldOrPropertyWithValue("type", snapshotStatusResult()).hasFieldOrProperty("timestamp");
        resetNotifications();
        Assertions.assertThat(readNotificationFromJmx()).hasSize(0);
    }

    @Test
    public void emittingDebeziumNotificationWillGenerateAJmxNotification() throws ReflectionException, MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException, InterruptedException, JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.SNAPSHOT_DELAY_MS, 2000).with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "jmx");
        });
        List<Notification> registerJmxNotificationListener = registerJmxNotificationListener();
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        Assertions.assertThat(readJmxNotifications()).allSatisfy(mBeanNotificationInfo -> {
            Assertions.assertThat(mBeanNotificationInfo.getName()).isEqualTo(Notification.class.getName());
        });
        Assertions.assertThat(registerJmxNotificationListener).hasSize(calculateNotificationSize());
        Assertions.assertThat(registerJmxNotificationListener.get(0)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification");
        Notification notification = (Notification) objectMapper.readValue(registerJmxNotificationListener.get(0).getUserData().toString(), Notification.class);
        Assertions.assertThat(notification).hasFieldOrPropertyWithValue("aggregateType", "Initial Snapshot").hasFieldOrPropertyWithValue("type", "STARTED").hasFieldOrPropertyWithValue("additionalData", Map.of("connector_name", server()));
        Assertions.assertThat(notification.getTimestamp()).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
        Assertions.assertThat(registerJmxNotificationListener.get(registerJmxNotificationListener.size() - 1)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification");
        Notification notification2 = (Notification) objectMapper.readValue(registerJmxNotificationListener.get(registerJmxNotificationListener.size() - 1).getUserData().toString(), Notification.class);
        Assertions.assertThat(notification2).hasFieldOrPropertyWithValue("aggregateType", "Initial Snapshot").hasFieldOrPropertyWithValue("type", "COMPLETED").hasFieldOrPropertyWithValue("additionalData", Map.of("connector_name", server()));
        Assertions.assertThat(notification2.getTimestamp()).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
    }

    private void assertTableNotificationsSentToJmx(List<Notification> list, String str) {
        Optional<Notification> findAny = list.stream().filter(notification -> {
            return notification.getType().equals("IN_PROGRESS") && notification.getAdditionalData().containsValue(str);
        }).findAny();
        Assertions.assertThat(findAny.isPresent()).isTrue();
        Assertions.assertThat((String) findAny.get().getAdditionalData().get("data_collections")).contains(collections());
        Assertions.assertThat(findAny.get().getAggregateType()).isEqualTo("Initial Snapshot");
        Assertions.assertThat(findAny.get().getTimestamp()).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
        Optional<Notification> findAny2 = list.stream().filter(notification2 -> {
            return notification2.getType().equals("TABLE_SCAN_COMPLETED") && notification2.getAdditionalData().containsValue(str);
        }).findAny();
        Assertions.assertThat(findAny2.isPresent()).isTrue();
        Assertions.assertThat((String) findAny2.get().getAdditionalData().get("status")).isEqualTo("SUCCEEDED");
        Assertions.assertThat((String) findAny2.get().getAdditionalData().get("data_collections")).contains(collections());
        Assertions.assertThat(findAny2.get().getAggregateType()).isEqualTo("Initial Snapshot");
        Assertions.assertThat(findAny2.get().getTimestamp()).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
    }

    private void assertTableNotificationsSentToTopic(List<SourceRecord> list, String str) {
        Optional findAny = list.stream().map(sourceRecord -> {
            return (Struct) sourceRecord.value();
        }).filter(struct -> {
            return struct.getString("type").equals("IN_PROGRESS") && struct.getMap("additional_data").containsValue(str);
        }).findAny();
        Assertions.assertThat(findAny.isPresent()).isTrue();
        Assertions.assertThat(((Struct) findAny.get()).getString("aggregate_type")).isEqualTo("Initial Snapshot");
        Assertions.assertThat(((Struct) findAny.get()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
        Optional findAny2 = list.stream().map(sourceRecord2 -> {
            return (Struct) sourceRecord2.value();
        }).filter(struct2 -> {
            return struct2.getString("type").equals("TABLE_SCAN_COMPLETED") && struct2.getMap("additional_data").containsValue(str);
        }).findAny();
        Assertions.assertThat(findAny2.isPresent()).isTrue();
        Assertions.assertThat(((Struct) findAny2.get()).getMap("additional_data").get("status")).isEqualTo("SUCCEEDED");
        Assertions.assertThat(((Struct) findAny2.get()).getMap("additional_data").get("data_collections").toString()).contains(collections());
        Assertions.assertThat(((Struct) findAny2.get()).getString("aggregate_type")).isEqualTo("Initial Snapshot");
        Assertions.assertThat(((Struct) findAny2.get()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
    }

    private List<Notification> readNotificationFromJmx() throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException {
        ObjectName objectName = getObjectName();
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        Assertions.assertThat((List) Arrays.stream(platformMBeanServer.getMBeanInfo(objectName).getAttributes()).map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList())).contains(new String[]{"Notifications"});
        return ((JmxNotificationChannelMXBean) JMX.newMXBeanProxy(platformMBeanServer, objectName, JmxNotificationChannelMXBean.class)).getNotifications();
    }

    private MBeanNotificationInfo[] readJmxNotifications() throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException {
        return ManagementFactory.getPlatformMBeanServer().getMBeanInfo(getObjectName()).getNotifications();
    }

    private ObjectName getObjectName() throws MalformedObjectNameException {
        return new ObjectName(String.format("debezium.%s:type=management,context=notifications,server=%s", connector(), server()));
    }

    private List<Notification> registerJmxNotificationListener() throws MalformedObjectNameException, InstanceNotFoundException {
        ObjectName objectName = getObjectName();
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        ArrayList arrayList = new ArrayList();
        platformMBeanServer.addNotificationListener(objectName, new ClientListener(), (NotificationFilter) null, arrayList);
        return arrayList;
    }

    private void resetNotifications() throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException {
        ManagementFactory.getPlatformMBeanServer().invoke(getObjectName(), "reset", new Object[0], new String[0]);
    }

    private int calculateNotificationSize() {
        return (collections().size() * 2) + 2;
    }
}
