package io.debezium.storage.rocketmq.history;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.storage.rocketmq.RocketMqAdminUtil;
import io.debezium.storage.rocketmq.RocketMqConfig;
import io.debezium.storage.rocketmq.ZeroMessageQueueSelector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.class */
public class RocketMqSchemaHistory extends AbstractSchemaHistory {
    public static final Field TOPIC = Field.create("schema.history.internal.rocketmq.topic").withDisplayName("Database schema history topic name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the topic for the database schema history").withValidation(new Field.Validator[]{forRocketMq(Field::isRequired)});
    public static final Field NAME_SRV_ADDR = Field.create("schema.history.internal.rocketmq.name.srv.addr").withDisplayName("NameServer address").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("RocketMQ service discovery service nameserver address configuration").withValidation(new Field.Validator[]{Field::isRequired});
    public static final Field ROCKETMQ_ACL_ENABLE = Field.create("schema.history.internal.rocketmq.acl.enabled").withDisplayName("Access control list enabled").withType(ConfigDef.Type.BOOLEAN).withDefault(false).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("RocketMQ access control enable configuration, default is 'false'");
    public static final Field ROCKETMQ_ACCESS_KEY = Field.create("schema.history.internal.rocketmq.access.key").withDisplayName("RocketMQ access key").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("RocketMQ access key. If " + ROCKETMQ_ACL_ENABLE + " is true, the value cannot be empty");
    public static final Field ROCKETMQ_SECRET_KEY = Field.create("schema.history.internal.rocketmq.secret.key").withDisplayName("RocketMQ secret key").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("RocketMQ secret key. If " + ROCKETMQ_ACL_ENABLE + " is true, the value cannot be empty");
    public static final Field RECOVERY_POLL_ATTEMPTS = Field.create("schema.history.internal.rocketmq.recovery.attempts").withDisplayName("Max attempts to recovery database schema history").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of attempts in a row that no data are returned from RocketMQ before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).").withDefault(60).withValidation(new Field.Validator[]{Field::isInteger});
    public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create("schema.history.internal.rocketmq.recovery.poll.interval.ms").withDisplayName("Poll interval during database schema history recovery (ms)").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of milliseconds to wait while polling for persisted data during recovery.").withDefault(1000).withValidation(new Field.Validator[]{Field::isLong});
    public static final Field STORE_RECORD_TIMEOUT_MS = Field.create("schema.history.internal.rocketmq.store.record.timeout.ms").withDisplayName("Timeout for sending messages to RocketMQ").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("Timeout for sending messages to RocketMQ.").withDefault(60000).withValidation(new Field.Validator[]{Field::isLong});
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqSchemaHistory.class);
    private static final Integer MESSAGE_QUEUE = 0;
    private final DocumentReader reader = DocumentReader.defaultReader();
    private String topicName;
    private String dbHistoryName;
    private DefaultMQProducer producer;
    private RocketMqConfig rocketMqConfig;
    private int maxRecoveryAttempts;
    private Long pollInterval;
    private Long sendingTimeout;

    /* renamed from: io.debezium.storage.rocketmq.history.RocketMqSchemaHistory$1, reason: invalid class name */
    /* loaded from: input_file:io/debezium/storage/rocketmq/history/RocketMqSchemaHistory$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$client$producer$SendStatus = new int[SendStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$client$producer$SendStatus[SendStatus.SEND_OK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    private static Field.Validator forRocketMq(Field.Validator validator) {
        return (configuration, field, validationOutput) -> {
            if (RocketMqSchemaHistory.class.getName().equals(configuration.getString(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY))) {
                return validator.validate(configuration, field, validationOutput);
            }
            return 0;
        };
    }

    public void configure(Configuration configuration, HistoryRecordComparator historyRecordComparator, SchemaHistoryListener schemaHistoryListener, boolean z) {
        super.configure(configuration, historyRecordComparator, schemaHistoryListener, z);
        this.topicName = configuration.getString(TOPIC);
        this.dbHistoryName = configuration.getString(SchemaHistory.NAME, UUID.randomUUID().toString());
        this.maxRecoveryAttempts = configuration.getInteger(RECOVERY_POLL_ATTEMPTS);
        this.pollInterval = Long.valueOf(configuration.getLong(RECOVERY_POLL_INTERVAL_MS));
        this.sendingTimeout = Long.valueOf(configuration.getLong(STORE_RECORD_TIMEOUT_MS));
        LOGGER.info("Configure to store the debezium database history {} to rocketmq topic {}", this.dbHistoryName, this.topicName);
        boolean z2 = configuration.getBoolean(ROCKETMQ_ACL_ENABLE);
        String string = configuration.getString(ROCKETMQ_ACCESS_KEY);
        String string2 = configuration.getString(ROCKETMQ_SECRET_KEY);
        if (z2 && (string == null || string2 == null)) {
            throw new SchemaHistoryException("if " + ROCKETMQ_ACL_ENABLE + " true,the configuration " + ROCKETMQ_ACCESS_KEY + " and " + ROCKETMQ_SECRET_KEY + " cannot be empty");
        }
        this.rocketMqConfig = RocketMqConfig.newBuilder().aclEnable(configuration.getBoolean(ROCKETMQ_ACL_ENABLE)).accessKey(configuration.getString(ROCKETMQ_ACCESS_KEY)).secretKey(configuration.getString(ROCKETMQ_SECRET_KEY)).namesrvAddr(configuration.getString(NAME_SRV_ADDR)).groupId(this.dbHistoryName).build();
    }

    public void initializeStorage() {
        super.initializeStorage();
        LOGGER.info("try to create history topic: {}!", this.topicName);
        RocketMqAdminUtil.createTopic(this.rocketMqConfig, new TopicConfig(this.topicName, 1, 1, 6));
    }

    public synchronized void start() {
        super.start();
        try {
            if (!RocketMqAdminUtil.fetchAllConsumerGroup(this.rocketMqConfig).contains(this.rocketMqConfig.getGroupId())) {
                RocketMqAdminUtil.createGroup(this.rocketMqConfig, this.rocketMqConfig.getGroupId());
            }
            this.producer = RocketMqAdminUtil.initDefaultMqProducer(this.rocketMqConfig);
            this.producer.start();
        } catch (MQClientException e) {
            throw new SchemaHistoryException(e);
        }
    }

    protected void storeRecord(HistoryRecord historyRecord) throws SchemaHistoryException {
        if (this.producer == null) {
            throw new IllegalStateException("No producer is available. Ensure that 'initializeStorage()' is called before storing database schema history records.");
        }
        LOGGER.trace("Storing record into database schema history: {}", historyRecord);
        try {
            Message message = new Message(this.topicName, historyRecord.toString().getBytes());
            SendResult send = this.producer.send(message, new ZeroMessageQueueSelector(), (Object) null, this.sendingTimeout.longValue());
            switch (AnonymousClass1.$SwitchMap$org$apache$rocketmq$client$producer$SendStatus[send.getSendStatus().ordinal()]) {
                case 1:
                    LOGGER.debug("Stored record in topic '{}' partition {} at offset {} ", new Object[]{message.getTopic(), send.getMessageQueue(), send.getMessageQueue()});
                    break;
                default:
                    LOGGER.warn("Stored record in topic '{}' partition {} at offset {}, send status {}", new Object[]{message.getTopic(), send.getMessageQueue(), send.getMessageQueue(), send.getSendStatus()});
                    break;
            }
        } catch (MQClientException | RemotingException | MQBrokerException e) {
            throw new SchemaHistoryException(e);
        } catch (InterruptedException e2) {
            LOGGER.error("Interrupted before record was written into database schema history: {}", historyRecord);
            Thread.currentThread().interrupt();
            throw new SchemaHistoryException(e2);
        }
    }

    protected void recoverRecords(Consumer<HistoryRecord> consumer) {
        DefaultLitePullConsumer defaultLitePullConsumer = null;
        try {
            try {
                defaultLitePullConsumer = RocketMqAdminUtil.initDefaultLitePullConsumer(this.rocketMqConfig, false);
                defaultLitePullConsumer.start();
                MessageQueue select = new ZeroMessageQueueSelector().select(new ArrayList(defaultLitePullConsumer.fetchMessageQueues(this.topicName)), null, null);
                defaultLitePullConsumer.assign(Collections.singleton(select));
                defaultLitePullConsumer.seekToBegin(select);
                long j = -1;
                Long l = null;
                int i = 0;
                while (i <= this.maxRecoveryAttempts) {
                    l = getMaxOffsetOfSchemaHistoryTopic(l, select);
                    LOGGER.debug("End offset of database schema history topic is {}", l);
                    int i2 = 0;
                    for (MessageExt messageExt : defaultLitePullConsumer.poll(this.pollInterval.longValue())) {
                        if (messageExt.getQueueOffset() > j) {
                            HistoryRecord historyRecord = new HistoryRecord(this.reader.read(messageExt.getBody()));
                            LOGGER.trace("Recovering database history: {}", historyRecord);
                            if (historyRecord == null || !historyRecord.isValid()) {
                                LOGGER.warn("Skipping invalid database history record '{}'. This is often not an issue, but if it happens repeatedly please check the '{}' topic.", historyRecord, this.topicName);
                            } else {
                                consumer.accept(historyRecord);
                                LOGGER.trace("Recovered database history: {}", historyRecord);
                            }
                            j = messageExt.getQueueOffset();
                            i2++;
                        }
                    }
                    if (i2 == 0) {
                        LOGGER.debug("No new records found in the database schema history; will retry");
                        i++;
                    } else {
                        LOGGER.debug("Processed {} records from database schema history", Integer.valueOf(i2));
                    }
                    if (j >= l.longValue() - 1) {
                        if (defaultLitePullConsumer != null) {
                            defaultLitePullConsumer.shutdown();
                            return;
                        }
                        return;
                    }
                }
                throw new IllegalStateException("The database schema history couldn't be recovered.");
            } catch (MQClientException | MQBrokerException | IOException | RemotingException | InterruptedException e) {
                throw new SchemaHistoryException(e);
            }
        } catch (Throwable th) {
            if (defaultLitePullConsumer != null) {
                defaultLitePullConsumer.shutdown();
            }
            throw th;
        }
    }

    private Long getMaxOffsetOfSchemaHistoryTopic(Long l, MessageQueue messageQueue) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
        Long valueOf = Long.valueOf(RocketMqAdminUtil.offsets(this.rocketMqConfig, this.topicName).get(messageQueue).getMaxOffset());
        if (l != null && !l.equals(valueOf)) {
            LOGGER.warn("Detected changed end offset of database schema history topic (previous: " + l + ", current: " + valueOf + "). Make sure that the same history topic isn't shared by multiple connector instances.");
        }
        return valueOf;
    }

    public boolean exists() {
        boolean z = false;
        if (storageExists()) {
            Map<MessageQueue, TopicOffset> offsets = RocketMqAdminUtil.offsets(this.rocketMqConfig, this.topicName);
            for (MessageQueue messageQueue : offsets.keySet()) {
                if (MESSAGE_QUEUE.intValue() == messageQueue.getQueueId()) {
                    z = offsets.get(messageQueue).getMaxOffset() > offsets.get(messageQueue).getMinOffset();
                }
            }
        }
        return z;
    }

    public boolean storageExists() {
        return RocketMqAdminUtil.topicExist(this.rocketMqConfig, this.topicName);
    }
}
