package io.debezium.pipeline.signal.channels;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.util.Collect;
import io.debezium.util.Loggings;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/pipeline/signal/channels/KafkaSignalChannel.class */
public class KafkaSignalChannel implements SignalChannelReader {
    public static final String CONFIGURATION_FIELD_PREFIX_STRING = "signal.";
    private static final String CONSUMER_PREFIX = "signal.consumer.";
    public static final String CHANNEL_OFFSET = "channelOffset";
    public static final String CHANNEL_NAME = "kafka";
    private String topicName;
    private String connectorName;
    private Duration pollTimeoutMs;
    private KafkaConsumer<String, String> signalsConsumer;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSignalChannel.class);
    public static final Field SIGNAL_TOPIC = Field.create("signal.kafka.topic").withDisplayName("Signal topic name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the topic for the signals to the connector").withValidation(Field::isRequired);
    public static final Field BOOTSTRAP_SERVERS = Field.create("signal.kafka.bootstrap.servers").withDisplayName("Kafka broker addresses").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("A list of host/port pairs that the connector will use for establishing the initial connection to the Kafka cluster for retrieving signals to the connector.This should point to the same Kafka cluster used by the Kafka Connect process.").withValidation(Field::isRequired);
    public static final Field SIGNAL_POLL_TIMEOUT_MS = Field.create("signal.kafka.poll.timeout.ms").withDisplayName("Poll timeout for kafka signals (ms)").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of milliseconds to wait while polling signals.").withDefault(0).withValidation(Field::isNonNegativeInteger);

    private Optional<SignalRecord> processSignal(ConsumerRecord<String, String> consumerRecord) {
        if (!this.connectorName.equals(consumerRecord.key())) {
            LOGGER.info("Signal key '{}' doesn't match the connector's name '{}'", consumerRecord.key(), this.connectorName);
            return Optional.empty();
        }
        String str = (String) consumerRecord.value();
        LOGGER.trace("Processing signal: {}", str);
        if (str == null || str.isEmpty()) {
            return Optional.empty();
        }
        Optional<Document> parseJson = parseJson(str);
        if (parseJson.isEmpty()) {
            return Optional.empty();
        }
        Document document = parseJson.get();
        return Optional.of(new SignalRecord(document.getString("id"), document.getString("type"), document.getDocument(CloudEventsMaker.FieldName.DATA).toString(), Map.of(CHANNEL_OFFSET, Long.valueOf(consumerRecord.offset()))));
    }

    private static Optional<Document> parseJson(String str) {
        try {
            return Optional.of(DocumentReader.defaultReader().read(str));
        } catch (IOException e) {
            Loggings.logErrorAndTraceRecord(LOGGER, str, "Skipped signal due to an error", e);
            return Optional.empty();
        }
    }

    public void seek(long j) {
        this.signalsConsumer.seek(new TopicPartition(this.topicName, 0), j + 1);
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public String name() {
        return CHANNEL_NAME;
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public void init(CommonConnectorConfig commonConnectorConfig) {
        this.connectorName = commonConnectorConfig.getLogicalName();
        Configuration build = commonConnectorConfig.getConfig().subset("signal.", false).edit().withDefault(SIGNAL_TOPIC, this.connectorName + "-signal").build();
        this.topicName = build.getString(SIGNAL_TOPIC);
        this.pollTimeoutMs = Duration.ofMillis(build.getInteger(SIGNAL_POLL_TIMEOUT_MS));
        this.signalsConsumer = new KafkaConsumer<>(buildKafkaConfiguration("kafka-signal", build).asProperties());
        LOGGER.info("Subscribing to signals topic '{}'", this.topicName);
        this.signalsConsumer.assign(Collect.arrayListOf(new TopicPartition(this.topicName, 0), new TopicPartition[0]));
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public void reset(Object obj) {
        this.signalsConsumer.seek(new TopicPartition(this.topicName, 0), ((Long) obj).longValue() + 1);
    }

    private static Configuration buildKafkaConfiguration(String str, Configuration configuration) {
        return configuration.subset(CONSUMER_PREFIX, true).edit().withDefault("bootstrap.servers", configuration.getString(BOOTSTRAP_SERVERS)).withDefault("client.id", UUID.randomUUID().toString()).withDefault("group.id", str).withDefault("fetch.min.bytes", 1).withDefault("enable.auto.commit", false).withDefault("session.timeout.ms", 10000).withDefault("key.deserializer", StringDeserializer.class).withDefault("value.deserializer", StringDeserializer.class).build();
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public List<SignalRecord> read() {
        LOGGER.debug("Reading signal form kafka");
        return (List) StreamSupport.stream(this.signalsConsumer.poll(this.pollTimeoutMs.toMillis()).spliterator(), false).map(this::processSignal).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public void close() {
        this.signalsConsumer.close();
    }
}
