package io.debezium.server.redis;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;

@TestProfile(RedisOffsetTestProfile.class)
@QuarkusIntegrationTest
@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
/* loaded from: input_file:io/debezium/server/redis/RedisOffsetIT.class */
public class RedisOffsetIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String STREAM_NAME = "testc.inventory.customers";
    private static final String OFFSETS_HASH_NAME = "metadata:debezium:offsets";
    protected static Jedis jedis;

    @FixFor({"DBZ-4509"})
    @Test
    public void testRedisStream() throws Exception {
        jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
        TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT);
        Assertions.assertThat(jedis.hgetAll(OFFSETS_HASH_NAME).size() > 0).isTrue();
    }

    @FixFor({"DBZ-4509"})
    @Test
    public void testRedisConnectionRetry() throws Exception {
        Testing.Print.enable();
        Jedis jedis2 = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
        TestUtils.awaitHashSizeGte(jedis2, OFFSETS_HASH_NAME, 1);
        jedis2.del(OFFSETS_HASH_NAME);
        Testing.print("Pausing container");
        RedisTestResourceLifecycleManager.pause();
        PostgresConnection postgresConnection = TestUtils.getPostgresConnection();
        Testing.print("Creating new redis_test table and inserting 5 records to it");
        postgresConnection.execute(new String[]{"CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)", "INSERT INTO inventory.redis_test VALUES (1)", "INSERT INTO inventory.redis_test VALUES (2)", "INSERT INTO inventory.redis_test VALUES (3)", "INSERT INTO inventory.redis_test VALUES (4)", "INSERT INTO inventory.redis_test VALUES (5)"});
        postgresConnection.close();
        Testing.print("Sleeping for 2 seconds to flush records");
        Thread.sleep(2000L);
        Testing.print("Unpausing container");
        RedisTestResourceLifecycleManager.unpause();
        Testing.print("Sleeping for 2 seconds to reconnect to redis and write offset");
        TestUtils.awaitHashSizeGte(jedis2, OFFSETS_HASH_NAME, 1);
        Map hgetAll = jedis2.hgetAll(OFFSETS_HASH_NAME);
        jedis2.close();
        Assertions.assertThat(hgetAll.size() > 0).isTrue();
    }
}
