package io.debezium.storage.redis.history;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Loggings;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/debezium/storage/redis/history/RedisSchemaHistory.class */
public class RedisSchemaHistory extends AbstractSchemaHistory {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSchemaHistory.class);
    private Duration initialRetryDelay;
    private Duration maxRetryDelay;
    private final DocumentWriter writer = DocumentWriter.defaultWriter();
    private final DocumentReader reader = DocumentReader.defaultReader();
    private final AtomicBoolean running = new AtomicBoolean();
    private RedisClient client;
    private RedisSchemaHistoryConfig config;

    void connect() {
        this.client = new RedisConnection(this.config.getAddress(), this.config.getUser(), this.config.getPassword(), this.config.getConnectionTimeout().intValue(), this.config.getSocketTimeout().intValue(), this.config.isSslEnabled()).getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY, this.config.isWaitEnabled(), this.config.getWaitTimeout(), this.config.isWaitRetryEnabled(), this.config.getWaitRetryDelay());
    }

    public void configure(Configuration configuration, HistoryRecordComparator historyRecordComparator, SchemaHistoryListener schemaHistoryListener, boolean z) {
        this.config = new RedisSchemaHistoryConfig(configuration);
        this.initialRetryDelay = Duration.ofMillis(this.config.getInitialRetryDelay().intValue());
        this.maxRetryDelay = Duration.ofMillis(this.config.getMaxRetryDelay().intValue());
        super.configure(configuration, historyRecordComparator, schemaHistoryListener, z);
    }

    public synchronized void start() {
        super.start();
        LOGGER.info("Starting RedisSchemaHistory");
        connect();
    }

    protected void storeRecord(HistoryRecord historyRecord) throws SchemaHistoryException {
        if (historyRecord == null) {
            return;
        }
        try {
            String write = this.writer.write(historyRecord.document());
            DelayStrategy exponential = DelayStrategy.exponential(this.initialRetryDelay, this.maxRetryDelay);
            boolean z = false;
            while (!z) {
                try {
                    if (this.client == null) {
                        connect();
                    }
                    this.client.xadd(this.config.getRedisKeyName(), Collections.singletonMap("schema", write));
                    LOGGER.trace("Record written to database schema history in Redis: " + write);
                    z = true;
                } catch (RedisClientConnectionException e) {
                    LOGGER.warn("Attempting to reconnect to Redis");
                    connect();
                } catch (Exception e2) {
                    LOGGER.warn("Writing to database schema history stream failed", e2);
                    LOGGER.warn("Will retry");
                }
                if (!z) {
                    exponential.sleepWhen(!z);
                }
            }
        } catch (IOException e3) {
            Loggings.logErrorAndTraceRecord(LOGGER, historyRecord, "Failed to convert record to string", e3);
            throw new SchemaHistoryException("Unable to write database schema history record");
        }
    }

    public void stop() {
        this.running.set(false);
        if (this.client != null) {
            this.client.disconnect();
        }
        super.stop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected synchronized void recoverRecords(Consumer<HistoryRecord> consumer) {
        DelayStrategy exponential = DelayStrategy.exponential(this.initialRetryDelay, this.maxRetryDelay);
        boolean z = false;
        List<Map> arrayList = new ArrayList();
        while (!z) {
            try {
                if (this.client == null) {
                    connect();
                }
                arrayList = this.client.xrange(this.config.getRedisKeyName());
                z = true;
            } catch (RedisClientConnectionException e) {
                LOGGER.warn("Attempting to reconnect to Redis");
                connect();
            } catch (Exception e2) {
                LOGGER.warn("Reading from database schema history stream failed with " + e2);
                LOGGER.warn("Will retry");
            }
            if (!z) {
                exponential.sleepWhen(!z);
            }
        }
        for (Map map : arrayList) {
            try {
                consumer.accept(new HistoryRecord(this.reader.read((String) map.get("schema"))));
            } catch (IOException e3) {
                LOGGER.error("Failed to convert record to string: {}", map, e3);
                return;
            }
        }
    }

    public boolean storageExists() {
        return true;
    }

    public boolean exists() {
        return this.client != null && this.client.xlen(this.config.getRedisKeyName()) > 0;
    }
}
