package io.debezium.embedded;

import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.engine.DebeziumEngine;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/embedded/EmbeddedEngineTest.class */
public class EmbeddedEngineTest extends AbstractConnectorTest {
    private static final int NUMBER_OF_LINES = 10;
    private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath();
    private static final Charset UTF8 = StandardCharsets.UTF_8;
    private File inputFile;
    private int nextConsumedLineNumber;
    private int linesAdded;
    private Configuration connectorConfig;

    @Before
    public void beforeEach() throws Exception {
        this.nextConsumedLineNumber = 1;
        this.linesAdded = 0;
        Testing.Files.delete(TEST_FILE_PATH);
        this.inputFile = Testing.Files.createTestingFile(TEST_FILE_PATH);
        this.connectorConfig = Configuration.create().with("file", TEST_FILE_PATH).with("topic", "topicX").build();
    }

    @Test
    public void shouldStartAndUseFileConnectorUsingMemoryOffsetStorage() throws Exception {
        appendLinesToSource(NUMBER_OF_LINES);
        start(FileStreamSourceConnector.class, this.connectorConfig);
        consumeLines(NUMBER_OF_LINES);
        assertNoRecordsToConsume();
        for (int i = 1; i != 5; i++) {
            appendLinesToSource(NUMBER_OF_LINES);
            consumeLines(NUMBER_OF_LINES);
            assertNoRecordsToConsume();
        }
        stopConnector();
        appendLinesToSource(NUMBER_OF_LINES);
        assertNoRecordsToConsume();
        start(FileStreamSourceConnector.class, this.connectorConfig);
        consumeLines(NUMBER_OF_LINES);
        assertNoRecordsToConsume();
    }

    @Test
    @FixFor({"DBZ-1080"})
    public void shouldWorkToUseCustomChangeConsumer() throws Exception {
        appendLinesToSource(NUMBER_OF_LINES);
        Configuration build = Configuration.copy(this.connectorConfig).with(EmbeddedEngine.ENGINE_NAME, "testing-connector").with(EmbeddedEngine.CONNECTOR_CLASS, FileStreamSourceConnector.class).with("offset.storage.file.filename", OFFSET_STORE_PATH).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0).build();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(6);
        this.engine = EmbeddedEngine.create().using(build).notifying((list, recordCommitter) -> {
            Assertions.assertThat(list.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
            Integer valueOf = Integer.valueOf(list.size() / NUMBER_OF_LINES);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                recordCommitter.markProcessed((SourceRecord) it.next());
            }
            recordCommitter.markBatchFinished();
            countDownLatch.countDown();
            for (int i = 0; i < valueOf.intValue(); i++) {
                countDownLatch2.countDown();
            }
        }).using(getClass().getClassLoader()).build();
        Executors.newFixedThreadPool(1).execute(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; i++) {
            appendLinesToSource(NUMBER_OF_LINES);
            Thread.sleep(10L);
        }
        countDownLatch2.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(countDownLatch2.getCount()).isEqualTo(0L);
        stopConnector();
    }

    @Test
    public void shouldRunDebeziumEngine() throws Exception {
        appendLinesToSource(NUMBER_OF_LINES);
        Properties properties = new Properties();
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        properties.setProperty("topic", "topicX");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(6);
        DebeziumEngine build = DebeziumEngine.create(Connect.class).using(properties).notifying((list, recordCommitter) -> {
            Assertions.assertThat(list.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
            Integer valueOf = Integer.valueOf(list.size() / NUMBER_OF_LINES);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                recordCommitter.markProcessed((SourceRecord) it.next());
            }
            recordCommitter.markBatchFinished();
            countDownLatch.countDown();
            for (int i = 0; i < valueOf.intValue(); i++) {
                countDownLatch2.countDown();
            }
        }).using(getClass().getClassLoader()).build();
        Executors.newFixedThreadPool(1).execute(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            build.run();
        });
        countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; i++) {
            appendLinesToSource(NUMBER_OF_LINES);
            Thread.sleep(10L);
        }
        countDownLatch2.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(countDownLatch2.getCount()).isEqualTo(0L);
        stopConnector();
    }

    protected void appendLinesToSource(int i) throws IOException {
        CharSequence[] charSequenceArr = new CharSequence[i];
        for (int i2 = 0; i2 != i; i2++) {
            charSequenceArr[i2] = generateLine(this.linesAdded + i2 + 1);
        }
        Files.write(this.inputFile.toPath(), Collect.arrayListOf(charSequenceArr), UTF8, StandardOpenOption.APPEND);
        this.linesAdded += i;
    }

    protected String generateLine(int i) {
        return "Generated line number " + i;
    }

    protected void consumeLines(int i) throws InterruptedException {
        consumeRecords(i, 3, sourceRecord -> {
            Assertions.assertThat(sourceRecord.value().toString()).isEqualTo(generateLine(this.nextConsumedLineNumber));
            this.nextConsumedLineNumber++;
        }, false);
    }
}
