package io.debezium.pipeline.notification;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.notification.Notification;
import io.debezium.pipeline.notification.channels.LogNotificationChannel;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.schema.SchemaFactory;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:io/debezium/pipeline/notification/NotificationServiceTest.class */
public class NotificationServiceTest {
    public static final String NOTIFICATION_ID = UUID.fromString("a5dc3ab8-933d-4aae-a994-2e5f9d47acd2").toString();
    private final SourceRecord expectedRecord = buildRecord();
    private boolean isConsumerCalled = false;

    @Mock
    private CommonConnectorConfig connectorConfig;

    private SourceRecord buildRecord() {
        Schema build = SchemaBuilder.struct().name("io.debezium.connector.common.NotificationKey").field("id", SchemaBuilder.STRING_SCHEMA).build();
        Schema build2 = SchemaBuilder.struct().name("io.debezium.connector.common.Notification").field("id", SchemaBuilder.STRING_SCHEMA).field("type", SchemaBuilder.STRING_SCHEMA).field("aggregate_type", SchemaBuilder.STRING_SCHEMA).field("additional_data", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, SchemaBuilder.STRING_SCHEMA)).field("timestamp", Schema.INT64_SCHEMA).build();
        return new SourceRecord(Map.of(), Map.of(), "notificationTopic", (Integer) null, build, new Struct(build).put("id", NOTIFICATION_ID), build2, new Struct(build2).put("id", NOTIFICATION_ID).put("type", "Test").put("aggregate_type", "Test").put("additional_data", Map.of("k1", "v1")).put("timestamp", 1695817046353L));
    }

    @Test
    public void testNotificationWithLogNotificationChannel() {
        Mockito.when(this.connectorConfig.getEnabledNotificationChannels()).thenReturn(List.of("log"));
        LogInterceptor logInterceptor = new LogInterceptor((Class<?>) LogNotificationChannel.class);
        new NotificationService(List.of(new LogNotificationChannel()), this.connectorConfig, new SchemaFactory(), this::consume).notify(Notification.Builder.builder().withId(NOTIFICATION_ID).withType("Test").withAggregateType("Test").withAdditionalData(Map.of("Key1", "Value1")).withTimestamp(1684279500L).build());
        Assertions.assertThat(logInterceptor.containsMessage("[Notification Service]  {aggregateType='Test', type='Test', additionalData={Key1=Value1}, timestamp=1684279500}")).isTrue();
    }

    @Test
    public void notificationSentOnKafkaChannelWillBeCorrectlyProcessed() {
        Mockito.when(this.connectorConfig.getNotificationTopic()).thenReturn("io.debezium.notification");
        Mockito.when(this.connectorConfig.getEnabledNotificationChannels()).thenReturn(List.of("sink"));
        new NotificationService(List.of(new SinkNotificationChannel()), this.connectorConfig, new SchemaFactory(), this::consume).notify(Notification.Builder.builder().withId(NOTIFICATION_ID).withType("Test").withAggregateType("Test").withAdditionalData(Map.of("k1", "v1")).withTimestamp(1695817046353L).build());
        Assertions.assertThat(this.isConsumerCalled).isTrue();
    }

    private void consume(SourceRecord sourceRecord) {
        Assertions.assertThat(((Struct) sourceRecord.value()).toString()).isEqualTo(((Struct) this.expectedRecord.value()).toString());
        Assertions.assertThat(((Struct) sourceRecord.key()).toString()).isEqualTo(((Struct) this.expectedRecord.key()).toString());
        Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification");
        this.isConsumerCalled = true;
    }
}
