package io.debezium.server.redis;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import java.time.Duration;
import javax.enterprise.event.Observes;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;

@QuarkusTest
@QuarkusTestResource.List({@QuarkusTestResource(PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(RedisTestResourceLifecycleManager.class)})
/* loaded from: input_file:io/debezium/server/redis/RedisStreamIT.class */
public class RedisStreamIT {

    @ConfigProperty(name = "debezium.source.database.hostname")
    String dbHostname;

    @ConfigProperty(name = "debezium.source.database.port")
    String dbPort;

    @ConfigProperty(name = "debezium.source.database.user")
    String dbUser;

    @ConfigProperty(name = "debezium.source.database.password")
    String dbPassword;

    @ConfigProperty(name = "debezium.source.database.dbname")
    String dbName;
    protected static Jedis jedis;

    public RedisStreamIT() {
        Testing.Files.delete(RedisTestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(RedisTestConfigSource.OFFSET_STORE_PATH);
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) {
        Testing.Print.enable();
        jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw ((Exception) connectorCompletedEvent.getError().get());
        }
    }

    private PostgresConnection getPostgresConnection() {
        return new PostgresConnection(Configuration.create().with("hostname", this.dbHostname).with("port", this.dbPort).with("user", this.dbUser).with("password", this.dbPassword).with("dbname", this.dbName).build());
    }

    private Long getStreamLength(String str, int i) {
        Awaitility.await().atMost(Duration.ofSeconds(RedisTestConfigSource.waitForSeconds())).until(() -> {
            return Boolean.valueOf(jedis.xlen(str).longValue() == ((long) i));
        });
        return jedis.xlen(str);
    }

    @Test
    public void testRedisStream() throws Exception {
        Assert.assertTrue("Redis Basic Stream Test Failed", getStreamLength("testc.inventory.customers", 4).longValue() == 4);
    }

    @FixFor({"DBZ-4510"})
    @Test
    public void testRedisConnectionRetry() throws Exception {
        Testing.print("Pausing container");
        RedisTestResourceLifecycleManager.pause();
        PostgresConnection postgresConnection = getPostgresConnection();
        Testing.print("Creating new redis_test table and inserting 5 records to it");
        postgresConnection.execute(new String[]{"CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)", "INSERT INTO inventory.redis_test VALUES (1)", "INSERT INTO inventory.redis_test VALUES (2)", "INSERT INTO inventory.redis_test VALUES (3)", "INSERT INTO inventory.redis_test VALUES (4)", "INSERT INTO inventory.redis_test VALUES (5)"});
        postgresConnection.close();
        Testing.print("Sleeping for 5 seconds to simulate no connection errors");
        Thread.sleep(5000L);
        Testing.print("Unpausing container");
        RedisTestResourceLifecycleManager.unpause();
        Long streamLength = getStreamLength("testc.inventory.redis_test", 5);
        Testing.print("Entries in testc.inventory.redis_test:" + streamLength);
        Assert.assertTrue("Redis Connection Test Failed", streamLength.longValue() == 5);
    }

    @FixFor({"DBZ-4510"})
    @Test
    public void testRedisOOMRetry() throws Exception {
        Testing.print("Setting Redis' maxmemory to 2M");
        jedis.configSet("maxmemory", "1M");
        PostgresConnection postgresConnection = getPostgresConnection();
        postgresConnection.execute(new String[]{"CREATE TABLE inventory.redis_test2 (id VARCHAR(100) PRIMARY KEY, first_name VARCHAR(100), last_name VARCHAR(100))", String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", 10)});
        Testing.print("Entries in testc.inventory.redis_test2:" + getStreamLength("testc.inventory.redis_test2", 10));
        postgresConnection.execute(new String[]{"DELETE FROM inventory.redis_test2"});
        Testing.print("Entries in testc.inventory.redis_test2:" + getStreamLength("testc.inventory.redis_test2", 30));
        postgresConnection.execute(new String[]{String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", 22)});
        postgresConnection.close();
        Thread.sleep(1000L);
        Testing.print("Deleting stream in order to free memory");
        jedis.del("testc.inventory.redis_test2");
        Long streamLength = getStreamLength("testc.inventory.redis_test2", 22);
        Testing.print("Entries in testc.inventory.redis_test2:" + streamLength);
        jedis.configSet("maxmemory", "0");
        Assert.assertTrue("Redis OOM Test Failed", streamLength.longValue() == 22);
    }
}
