package io.debezium.server.redis;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.util.DelayStrategy;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;

@Dependent
@Named("redis")
/* loaded from: input_file:io/debezium/server/redis/RedisStreamChangeConsumer.class */
public class RedisStreamChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.redis.";
    private static final String PROP_ADDRESS = "debezium.sink.redis.address";
    private static final String PROP_USER = "debezium.sink.redis.user";
    private static final String PROP_PASSWORD = "debezium.sink.redis.password";
    private HostAndPort address;
    private Optional<String> user;
    private Optional<String> password;

    @ConfigProperty(name = "debezium.sink.redis.batch.size", defaultValue = "500")
    Integer batchSize;

    @ConfigProperty(name = "debezium.sink.redis.retry.initial.delay.ms", defaultValue = "300")
    Integer initialRetryDelay;

    @ConfigProperty(name = "debezium.sink.redis.retry.max.delay.ms", defaultValue = "10000")
    Integer maxRetryDelay;

    @ConfigProperty(name = "debezium.sink.redis.null.key", defaultValue = "default")
    String nullKey;

    @ConfigProperty(name = "debezium.sink.redis.null.value", defaultValue = "default")
    String nullValue;
    private Jedis client = null;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        this.address = HostAndPort.from((String) config.getValue(PROP_ADDRESS, String.class));
        this.user = config.getOptionalValue(PROP_USER, String.class);
        this.password = config.getOptionalValue(PROP_PASSWORD, String.class);
        this.client = new Jedis(this.address);
        if (this.user.isPresent()) {
            this.client.auth(this.user.get(), this.password.get());
        } else if (this.password.isPresent()) {
            this.client.auth(this.password.get());
        } else {
            this.client.ping();
        }
        LOGGER.info("Using Jedis '{}'", this.client);
    }

    @PreDestroy
    void close() {
        try {
            this.client.close();
        } catch (Exception e) {
            LOGGER.warn("Exception while closing Jedis: {}", this.client, e);
        } finally {
            this.client = null;
        }
    }

    private <T> Stream<List<T>> batches(List<T> list, int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("length = " + i);
        }
        int size = list.size();
        if (size <= 0) {
            return Stream.empty();
        }
        int i2 = (size - 1) / i;
        return IntStream.range(0, i2 + 1).mapToObj(i3 -> {
            return list.subList(i3 * i, i3 == i2 ? size : (i3 + 1) * i);
        });
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        DelayStrategy exponential = DelayStrategy.exponential(this.initialRetryDelay.intValue(), this.maxRetryDelay.intValue());
        LOGGER.info("Handling a batch of {} records", Integer.valueOf(list.size()));
        batches(list, this.batchSize.intValue()).forEach(list2 -> {
            boolean z = false;
            while (!z) {
                if (this.client == null) {
                    try {
                        connect();
                    } catch (Exception e) {
                        close();
                        LOGGER.error("Can't connect to Redis", e);
                    }
                } else {
                    try {
                        LOGGER.info("Preparing a Redis Transaction of {} records", Integer.valueOf(list2.size()));
                        Transaction multi = this.client.multi();
                        Iterator it = list2.iterator();
                        while (it.hasNext()) {
                            ChangeEvent changeEvent = (ChangeEvent) it.next();
                            multi.xadd(this.streamNameMapper.map(changeEvent.destination()), (StreamEntryID) null, Collections.singletonMap(changeEvent.key() != null ? getString(changeEvent.key()) : this.nullKey, changeEvent.value() != null ? getString(changeEvent.value()) : this.nullValue));
                        }
                        multi.exec();
                        Iterator it2 = list2.iterator();
                        while (it2.hasNext()) {
                            recordCommitter.markProcessed((ChangeEvent) it2.next());
                        }
                        z = true;
                    } catch (JedisConnectionException e2) {
                        close();
                    } catch (Exception e3) {
                        LOGGER.error("Unexpected Exception", e3);
                        throw new DebeziumException(e3);
                    } catch (JedisDataException e4) {
                        if (e4.getMessage().equals("EXECABORT Transaction discarded because of previous errors.") || e4.getMessage().equals("EXECABORT Transaction discarded because of: OOM command not allowed when used memory > 'maxmemory'.")) {
                            LOGGER.error("Redis runs OOM", e4);
                        } else {
                            if (!e4.getMessage().equals("LOADING Redis is loading the dataset in memory")) {
                                LOGGER.error("Unexpected JedisDataException", e4);
                                throw new DebeziumException(e4);
                            }
                            LOGGER.error("Redis is starting", e4);
                        }
                    }
                }
                exponential.sleepWhen(!z);
            }
        });
        recordCommitter.markBatchFinished();
    }
}
