package io.debezium.storage.redis.offset;

import io.debezium.config.Configuration;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
import io.smallrye.mutiny.Uni;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/storage/redis/offset/RedisOffsetBackingStore.class */
public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisOffsetBackingStore.class);
    private RedisOffsetBackingStoreConfig config;
    private RedisClient client;

    void connect() {
        this.client = new RedisConnection(this.config.getAddress(), this.config.getUser(), this.config.getPassword(), this.config.getConnectionTimeout().intValue(), this.config.getSocketTimeout().intValue(), this.config.isSslEnabled()).getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, this.config.isWaitEnabled(), this.config.getWaitTimeout(), this.config.isWaitRetryEnabled(), this.config.getWaitRetryDelay());
    }

    public void configure(WorkerConfig workerConfig) {
        super.configure(workerConfig);
        this.config = new RedisOffsetBackingStoreConfig(Configuration.from(workerConfig.originalsStrings()));
    }

    public synchronized void start() {
        super.start();
        LOGGER.info("Starting RedisOffsetBackingStore");
        connect();
        load();
    }

    public synchronized void stop() {
        super.stop();
        LOGGER.info("Stopped RedisOffsetBackingStore");
    }

    private void load() {
        Map map = (Map) Uni.createFrom().item(() -> {
            return this.client.hgetAll(this.config.getRedisKeyName());
        }).onFailure().invoke(th -> {
            LOGGER.warn("Reading from Redis offset store failed with " + th);
            LOGGER.warn("Will retry");
        }).onFailure(RedisClientConnectionException.class).invoke(th2 -> {
            LOGGER.warn("Attempting to reconnect to Redis");
            connect();
        }).onFailure().retry().withBackOff(Duration.ofMillis(this.config.getInitialRetryDelay().intValue()), Duration.ofMillis(this.config.getMaxRetryDelay().intValue())).indefinitely().invoke(map2 -> {
            LOGGER.trace("Offsets fetched from Redis: " + map2);
        }).await().indefinitely();
        this.data = new HashMap();
        for (Map.Entry entry : map.entrySet()) {
            this.data.put(entry.getKey() != null ? ByteBuffer.wrap(((String) entry.getKey()).getBytes()) : null, entry.getValue() != null ? ByteBuffer.wrap(((String) entry.getValue()).getBytes()) : null);
        }
    }

    protected void save() {
        for (Map.Entry entry : this.data.entrySet()) {
            byte[] array = entry.getKey() != null ? ((ByteBuffer) entry.getKey()).array() : null;
            byte[] array2 = entry.getValue() != null ? ((ByteBuffer) entry.getValue()).array() : null;
            Uni.createFrom().item(() -> {
                return Long.valueOf(this.client.hset(this.config.getRedisKeyName().getBytes(), array, array2));
            }).onFailure().invoke(th -> {
                LOGGER.warn("Writing to Redis offset store failed with " + th);
                LOGGER.warn("Will retry");
            }).onFailure(RedisClientConnectionException.class).invoke(th2 -> {
                LOGGER.warn("Attempting to reconnect to Redis");
                connect();
            }).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(2L)).indefinitely().invoke(l -> {
                LOGGER.trace("Offsets written to Redis: " + array2);
            }).await().indefinitely();
        }
    }
}
