package io.debezium.kafka;

import io.debezium.junit.SkipLongRunning;
import io.debezium.junit.SkipTestRule;
import io.debezium.kafka.KafkaCluster;
import io.debezium.util.Collect;
import io.debezium.util.Stopwatch;
import io.debezium.util.Testing;
import java.io.File;
import java.lang.reflect.Field;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/kafka/KafkaClusterTest.class */
public class KafkaClusterTest {

    @Rule
    public TestRule skipTestRule = new SkipTestRule();
    private KafkaCluster cluster;
    private File dataDir;

    @Before
    public void beforeEach() {
        this.dataDir = Testing.Files.createTestingDirectory("cluster");
        this.cluster = new KafkaCluster().usingDirectory(this.dataDir).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).withKafkaConfiguration(Collect.propertiesOf(KafkaConfig.ZkSessionTimeoutMsProp(), "20000"));
    }

    @After
    public void afterEach() {
        this.cluster.shutdown();
        Testing.Files.delete(this.dataDir);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterWithOneBrokerAndRemoveData() throws Exception {
        this.cluster.addBrokers(1).startup();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
        this.cluster.shutdown();
        this.cluster.onEachDirectory(this::assertDoesNotExist);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterWithMultipleBrokerAndRemoveData() throws Exception {
        this.cluster.addBrokers(3).startup();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
        this.cluster.shutdown();
        this.cluster.onEachDirectory(this::assertDoesNotExist);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterWithOneBrokerAndLeaveData() throws Exception {
        this.cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
        this.cluster.shutdown();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterWithMultipleBrokerAndLeaveData() throws Exception {
        this.cluster.deleteDataUponShutdown(false).addBrokers(3).startup();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
        this.cluster.shutdown();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterAndAllowProducersAndConsumersToUseIt() throws Exception {
        Testing.Debug.enable();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicLong atomicLong = new AtomicLong(0L);
        this.cluster.addBrokers(1).startup();
        this.cluster.createTopics("topicA");
        Stopwatch start = Stopwatch.reusable().start();
        KafkaCluster.Usage useTo = this.cluster.useTo();
        TimeUnit timeUnit = TimeUnit.SECONDS;
        Objects.requireNonNull(countDownLatch);
        useTo.consumeIntegers("topicA", 100, 10L, timeUnit, countDownLatch::countDown, (str, num) -> {
            atomicLong.incrementAndGet();
            return true;
        });
        KafkaCluster.Usage useTo2 = this.cluster.useTo();
        Objects.requireNonNull(countDownLatch);
        useTo2.produceIntegers("topicA", 100, 1, countDownLatch::countDown);
        if (countDownLatch.await(10L, TimeUnit.SECONDS)) {
            start.stop();
            Testing.debug("Both consumer and producer completed normally in " + start.durations());
        } else {
            Testing.debug("Consumer and/or producer did not completed normally");
        }
        Assertions.assertThat(atomicLong.get()).isEqualTo(100L);
    }

    @Test
    public void shouldStartClusterAndAllowInteractiveProductionAndAutomaticConsumersToUseIt() throws Exception {
        Testing.Debug.enable();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicLong atomicLong = new AtomicLong(0L);
        this.cluster.addBrokers(1).startup();
        this.cluster.createTopics("topicA");
        Stopwatch start = Stopwatch.reusable().start();
        KafkaCluster.Usage useTo = this.cluster.useTo();
        TimeUnit timeUnit = TimeUnit.SECONDS;
        Objects.requireNonNull(countDownLatch);
        useTo.consumeIntegers("topicA", 3, 10L, timeUnit, countDownLatch::countDown, (str, num) -> {
            atomicLong.incrementAndGet();
            return true;
        });
        this.cluster.useTo().createProducer("manual", new StringSerializer(), new IntegerSerializer()).write("topicA", "key1", 1).write("topicA", "key2", 2).write("topicA", "key3", 3).close();
        if (countDownLatch.await(10L, TimeUnit.SECONDS)) {
            start.stop();
            Testing.debug("The consumer completed normally in " + start.durations());
        } else {
            Testing.debug("Consumer did not completed normally");
        }
        Assertions.assertThat(atomicLong.get()).isEqualTo(3L);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterAndAllowAsynchronousProductionAndAutomaticConsumersToUseIt() throws Exception {
        Testing.Debug.enable();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicLong atomicLong = new AtomicLong(0L);
        this.cluster.addBrokers(1).startup();
        this.cluster.createTopics("topicA");
        Stopwatch start = Stopwatch.reusable().start();
        KafkaCluster.Usage useTo = this.cluster.useTo();
        TimeUnit timeUnit = TimeUnit.SECONDS;
        Objects.requireNonNull(countDownLatch);
        useTo.consumeIntegers("topicA", 3, 10L, timeUnit, countDownLatch::countDown, (str, num) -> {
            atomicLong.incrementAndGet();
            return true;
        });
        this.cluster.useTo().produce("manual", new StringSerializer(), new IntegerSerializer(), interactiveProducer -> {
            interactiveProducer.write("topicA", "key1", 1);
            interactiveProducer.write("topicA", "key2", 2);
            interactiveProducer.write("topicA", "key3", 3);
            countDownLatch.countDown();
        });
        if (countDownLatch.await(10L, TimeUnit.SECONDS)) {
            start.stop();
            Testing.debug("The consumer completed normally in " + start.durations());
        } else {
            Testing.debug("Consumer did not completed normally");
        }
        Assertions.assertThat(atomicLong.get()).isEqualTo(3L);
    }

    @Test
    public void shouldSetClusterConfigProperty() throws Exception {
        Properties properties = new Properties();
        properties.put("foo", "bar");
        KafkaCluster withKafkaConfiguration = new KafkaCluster().withKafkaConfiguration(properties);
        Field declaredField = KafkaCluster.class.getDeclaredField("kafkaConfig");
        declaredField.setAccessible(true);
        Assertions.assertThat((Properties) declaredField.get(withKafkaConfiguration)).hasSize(1);
    }

    @Test
    public void shouldSetServerConfigProperty() throws Exception {
        Properties properties = new Properties();
        properties.put("foo", "bar");
        KafkaCluster addBrokers = new KafkaCluster().withKafkaConfiguration(properties).addBrokers(1);
        Field declaredField = KafkaCluster.class.getDeclaredField("kafkaServers");
        declaredField.setAccessible(true);
        Assertions.assertThat(((KafkaServer) ((ConcurrentMap) declaredField.get(addBrokers)).values().iterator().next()).config().get("foo")).isEqualTo("bar");
    }

    protected void assertValidDataDirectory(File file) {
        Assertions.assertThat(file.exists()).isTrue();
        Assertions.assertThat(file.isDirectory()).isTrue();
        Assertions.assertThat(file.canWrite()).isTrue();
        Assertions.assertThat(file.canRead()).isTrue();
        Assertions.assertThat(Testing.Files.inTestDataDir(file)).isTrue();
    }

    protected void assertDoesNotExist(File file) {
        Assertions.assertThat(file.exists()).isFalse();
    }
}
