package io.debezium.embedded.async;

import ch.qos.logback.classic.Level;
import io.debezium.DebeziumException;
import io.debezium.connector.simple.SimpleSourceConnector;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.DebeziumEngineTestUtils;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.async.AsyncEmbeddedEngine;
import io.debezium.embedded.async.DebeziumAsyncEngineTestUtils;
import io.debezium.engine.DebeziumEngine;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngineTest.class */
public class AsyncEmbeddedEngineTest {
    private static final int NUMBER_OF_LINES = 10;
    protected DebeziumEngine<SourceRecord> engine;
    protected ExecutorService engineExecSrv = Executors.newFixedThreadPool(1);
    private File inputFile;
    private int linesAdded;
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncEmbeddedEngineTest.class);
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("file-connector-offsets.txt").toAbsolutePath();
    private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath();
    protected static final AtomicBoolean isEngineRunning = new AtomicBoolean(false);
    protected static final AtomicInteger runningTasks = new AtomicInteger(0);

    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngineTest$TestEngineConnectorCallback.class */
    public static class TestEngineConnectorCallback implements DebeziumEngine.ConnectorCallback {
        public void taskStarted() {
            AsyncEmbeddedEngineTest.runningTasks.incrementAndGet();
        }

        public void taskStopped() {
            AsyncEmbeddedEngineTest.runningTasks.decrementAndGet();
        }

        public void connectorStarted() {
            AsyncEmbeddedEngineTest.isEngineRunning.compareAndExchange(false, true);
        }

        public void connectorStopped() {
            AsyncEmbeddedEngineTest.isEngineRunning.set(false);
        }
    }

    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngineTest$WaitInTaskStartConnector.class */
    static class WaitInTaskStartConnector extends SimpleSourceConnector {
        WaitInTaskStartConnector() {
        }

        public Class<? extends Task> taskClass() {
            return WaitInTaskStartTask.class;
        }
    }

    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngineTest$WaitInTaskStartTask.class */
    static class WaitInTaskStartTask extends SimpleSourceConnector.SimpleConnectorTask {
        public static CountDownLatch taskStartingLatch = new CountDownLatch(1);
        public static CountDownLatch continueLatch = new CountDownLatch(1);

        WaitInTaskStartTask() {
        }

        public void start(Map<String, String> map) {
            taskStartingLatch.countDown();
            try {
                continueLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new DebeziumException("Waiting for continuation of start was interrupted.");
            }
        }

        public List<SourceRecord> poll() throws InterruptedException {
            return new ArrayList();
        }
    }

    @Before
    public void beforeEach() throws Exception {
        this.linesAdded = 0;
        Testing.Files.delete(TEST_FILE_PATH);
        Testing.Files.delete(OFFSET_STORE_PATH);
        this.inputFile = Testing.Files.createTestingFile(TEST_FILE_PATH);
        isEngineRunning.set(false);
        runningTasks.set(0);
    }

    @Test
    public void testEngineBasicLifecycle() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("tasks.max", "1");
        properties.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        properties.setProperty("topic", "testTopic");
        appendLinesToSource(NUMBER_OF_LINES);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(6);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).notifying((list, recordCommitter) -> {
            Assertions.assertThat(list.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
            Integer valueOf = Integer.valueOf(list.size() / NUMBER_OF_LINES);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                recordCommitter.markProcessed((SourceRecord) it.next());
            }
            recordCommitter.markBatchFinished();
            countDownLatch.countDown();
            for (int i = 0; i < valueOf.intValue(); i++) {
                countDownLatch2.countDown();
            }
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        countDownLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; i++) {
            appendLinesToSource(NUMBER_OF_LINES);
            Thread.sleep(10L);
        }
        countDownLatch2.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch2.getCount()).isEqualTo(0L);
        stopEngine();
    }

    @Test
    public void testRunMultipleTasks() throws Exception {
        Properties properties = new Properties();
        properties.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        properties.setProperty("tasks.max", String.valueOf(5));
        properties.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), DebeziumAsyncEngineTestUtils.MultiTaskSimpleSourceConnector.class.getName());
        properties.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.put("batch.count", 1);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).notifying((list, recordCommitter) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                SourceRecord sourceRecord = (SourceRecord) it.next();
                atomicInteger.incrementAndGet();
                recordCommitter.markProcessed(sourceRecord);
            }
        }).using(getClass().getClassLoader()).build();
        Executors.newFixedThreadPool(1).execute(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        Awaitility.await().alias("Haven't read all the records in time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 50);
        });
        stopEngine();
    }

    @Test
    public void testTasksAreStoppedIfSomeFailsToStart() {
        Properties properties = new Properties();
        properties.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        properties.setProperty("tasks.max", String.valueOf(NUMBER_OF_LINES));
        properties.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), DebeziumAsyncEngineTestUtils.AlmostRandomlyFailingDuringStartConnector.class.getName());
        properties.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.put("batch.count", 1);
        properties.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
        new AtomicInteger(0);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).notifying((list, recordCommitter) -> {
        }).using(getClass().getClassLoader()).build();
        Executors.newFixedThreadPool(1).execute(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        Awaitility.await().alias("At least some tasks haven't stared on time").pollInterval(10L, TimeUnit.MILLISECONDS).atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(runningTasks.get() > 0);
        });
        Awaitility.await().alias("Tasks haven't been stopped on time").pollInterval(10L, TimeUnit.MILLISECONDS).atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(runningTasks.get() <= 0);
        });
        waitForEngineToStop();
    }

    @Test
    public void testCompletionCallbackCalledUponSuccess() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("tasks.max", "1");
        properties.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        properties.setProperty("topic", "testTopic");
        appendLinesToSource(NUMBER_OF_LINES);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).using((z, str, th) -> {
            if (z && th == null) {
                countDownLatch2.countDown();
            }
        }).notifying((list, recordCommitter) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                recordCommitter.markProcessed((SourceRecord) it.next());
                atomicInteger.getAndIncrement();
            }
            recordCommitter.markBatchFinished();
            countDownLatch.countDown();
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        appendLinesToSource(NUMBER_OF_LINES);
        countDownLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(atomicInteger.get()).isEqualTo(20);
        stopEngine();
        countDownLatch2.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch2.getCount()).isEqualTo(0L);
    }

    @Test
    public void testCompletionCallbackCalledUponFailure() throws Exception {
        Properties properties = new Properties();
        properties.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        properties.setProperty("tasks.max", "1");
        properties.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), DebeziumAsyncEngineTestUtils.InterruptedConnector.class.getName());
        properties.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.put("batch.count", 1);
        properties.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).using((z, str, th) -> {
            if (z || !(th instanceof InterruptedException)) {
                return;
            }
            countDownLatch.countDown();
        }).notifying((list, recordCommitter) -> {
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        countDownLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
    }

    @Test
    @FixFor({"DBZ-2534"})
    public void testCannotStopWhileTasksAreStarting() throws Exception {
        Properties properties = new Properties();
        properties.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        properties.setProperty("tasks.max", "1");
        properties.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), WaitInTaskStartConnector.class.getName());
        properties.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.put("batch.count", 1);
        properties.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
        new CountDownLatch(1);
        new CountDownLatch(1);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).notifying((list, recordCommitter) -> {
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        WaitInTaskStartTask.taskStartingLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Exception exc = null;
        try {
            stopEngine();
        } catch (Exception e) {
            exc = e;
        }
        Assertions.assertThat(exc).isNotNull();
        Assertions.assertThat(exc).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat(exc.getMessage()).isEqualTo("Cannot stop engine while tasks are starting, this may lead to leaked resource. Wait for the tasks to be fully started.");
        WaitInTaskStartTask.continueLatch.countDown();
        waitForTasksToStart(1);
        stopEngine();
    }

    @Test
    public void testCannotStopAlreadyStoppedEngine() throws Exception {
        Properties properties = new Properties();
        properties.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        properties.setProperty("tasks.max", "1");
        properties.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), DebeziumAsyncEngineTestUtils.NoOpConnector.class.getName());
        properties.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.put("batch.count", 1);
        properties.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).notifying((list, recordCommitter) -> {
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        waitForTasksToStart(1);
        stopEngine();
        waitForEngineToStop();
        Exception exc = null;
        try {
            stopEngine();
        } catch (Exception e) {
            exc = e;
        }
        Assertions.assertThat(exc).isNotNull();
        Assertions.assertThat(exc).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat(exc.getMessage()).isEqualTo("Engine has been already shut down.");
    }

    @Test
    public void testExecuteSmt() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("tasks.max", "1");
        properties.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        properties.setProperty("topic", "testTopic");
        properties.setProperty("predicates", "filter");
        properties.setProperty("predicates.filter.type", DebeziumEngineTestUtils.FilterPredicate.class.getName());
        properties.setProperty("transforms", "filter, router");
        properties.setProperty("transforms.filter.type", "io.debezium.embedded.DebeziumEngineTestUtils$FilterTransform");
        properties.setProperty("transforms.filter.predicate", "filter");
        properties.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");
        properties.setProperty("transforms.router.regex", "(.*)");
        properties.setProperty("transforms.router.replacement", "routing_smt_$1");
        appendLinesToSource(NUMBER_OF_LINES);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(5);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).notifying((list, recordCommitter) -> {
            Assertions.assertThat(list.size()).isGreaterThanOrEqualTo(9);
            list.forEach(sourceRecord -> {
                Assertions.assertThat(sourceRecord.topic()).isEqualTo("routing_smt_testTopic");
            });
            Integer valueOf = Integer.valueOf(list.size() / NUMBER_OF_LINES);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                recordCommitter.markProcessed((SourceRecord) it.next());
            }
            recordCommitter.markBatchFinished();
            countDownLatch.countDown();
            for (int i = 0; i < valueOf.intValue(); i++) {
                countDownLatch2.countDown();
            }
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        countDownLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; i++) {
            appendLinesToSource(NUMBER_OF_LINES);
            Thread.sleep(10L);
        }
        countDownLatch2.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch2.getCount()).isEqualTo(0L);
        stopEngine();
    }

    @Test
    public void testPollingIsRetriedUponFailure() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("tasks.max", "1");
        properties.setProperty("connector.class", SimpleSourceConnector.class.getName());
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("error.retriable.on", "5, 7");
        CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_LINES);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).notifying((list, recordCommitter) -> {
            Assertions.assertThat(list.size()).isEqualTo(1);
            recordCommitter.markProcessed((SourceRecord) list.get(0));
            recordCommitter.markBatchFinished();
            countDownLatch.countDown();
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        countDownLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
        stopEngine();
    }

    @Test
    public void testConnectorFailsIfMaxRetriesExceeded() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("tasks.max", "1");
        properties.setProperty("connector.class", SimpleSourceConnector.class.getName());
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("error.retriable.on", "5, 7");
        properties.setProperty(EmbeddedEngineConfig.ERRORS_MAX_RETRIES.name(), "1");
        CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_LINES);
        LogInterceptor logInterceptor = new LogInterceptor(AsyncEmbeddedEngine.class);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).notifying((list, recordCommitter) -> {
            Assertions.assertThat(list.size()).isEqualTo(1);
            recordCommitter.markProcessed((SourceRecord) list.get(0));
            recordCommitter.markBatchFinished();
            countDownLatch.countDown();
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        countDownLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch.getCount()).isEqualTo(4L);
        waitForEngineToStop();
        Assertions.assertThat(logInterceptor.containsErrorMessage("Engine has failed with")).isTrue();
        Assertions.assertThat(logInterceptor.containsMessage("Engine state has changed from 'POLLING_TASKS' to 'STOPPING'")).isTrue();
        Assertions.assertThat(logInterceptor.containsMessage("Engine state has changed from 'STOPPING' to 'STOPPED'")).isTrue();
    }

    @Test
    public void testEngineBasicLifecycleConsumerSequentially() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("tasks.max", "1");
        properties.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        properties.setProperty("topic", "testTopic");
        properties.setProperty(AsyncEngineConfig.RECORD_PROCESSING_ORDER.name(), "ORDERED");
        runEngineBasicLifecycleWithConsumer(properties);
    }

    @Test
    public void testEngineBasicLifecycleConsumerNonSequentially() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("tasks.max", "1");
        properties.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        properties.setProperty("topic", "testTopic");
        properties.setProperty(AsyncEngineConfig.RECORD_PROCESSING_ORDER.name(), "UNORDERED");
        runEngineBasicLifecycleWithConsumer(properties);
    }

    @Test
    @FixFor({"DBZ-7496"})
    public void testCompletionCallbackCalledAfterConnectorStop() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("name", "debezium-engine");
        properties.setProperty("tasks.max", "1");
        properties.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        properties.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        properties.setProperty("topic", "testTopic");
        appendLinesToSource(NUMBER_OF_LINES);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using((z, str, th) -> {
            if (z && th == null) {
                Assertions.assertThat(atomicBoolean.get()).isTrue();
                countDownLatch2.countDown();
            }
        }).notifying((list, recordCommitter) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                recordCommitter.markProcessed((SourceRecord) it.next());
                atomicInteger.getAndIncrement();
            }
            recordCommitter.markBatchFinished();
            countDownLatch.countDown();
        }).using(new DebeziumEngine.ConnectorCallback() { // from class: io.debezium.embedded.async.AsyncEmbeddedEngineTest.1
            public void connectorStarted() {
                AsyncEmbeddedEngineTest.isEngineRunning.compareAndExchange(false, true);
            }

            public void connectorStopped() {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    AsyncEmbeddedEngineTest.LOGGER.warn("Connector callback was interrupted.");
                }
                atomicBoolean.set(true);
                AsyncEmbeddedEngineTest.isEngineRunning.set(false);
            }
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        waitForEngineToStart();
        LOGGER.info("Stopping engine");
        this.engine.close();
        countDownLatch2.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch2.getCount()).isEqualTo(0L);
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    private void runEngineBasicLifecycleWithConsumer(Properties properties) throws IOException, InterruptedException {
        LogInterceptor logInterceptor = new LogInterceptor(AsyncEmbeddedEngine.class);
        logInterceptor.setLoggerLevel(AsyncEmbeddedEngine.class, Level.DEBUG);
        appendLinesToSource(NUMBER_OF_LINES);
        CountDownLatch countDownLatch = new CountDownLatch(60);
        this.engine = new AsyncEmbeddedEngine.AsyncEngineBuilder().using(properties).using(new TestEngineConnectorCallback()).notifying(sourceRecord -> {
            countDownLatch.countDown();
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        for (int i = 0; i < 5; i++) {
            appendLinesToSource(NUMBER_OF_LINES);
            Thread.sleep(10L);
        }
        countDownLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch.getCount()).isEqualTo(0L);
        Assertions.assertThat(logInterceptor.containsMessage("Using io.debezium.embedded.async.AsyncEmbeddedEngine$ParallelSmtConsumerProcessor processor"));
        stopEngine();
    }

    protected void stopEngine() {
        try {
            LOGGER.info("Stopping engine");
            this.engine.close();
            Awaitility.await().atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(!isEngineRunning.get());
            });
        } catch (ConditionTimeoutException e) {
            LOGGER.warn("Engine has not stopped on time");
            this.engineExecSrv.shutdownNow();
        } catch (IOException e2) {
            LOGGER.warn("Failed during engine stop", e2);
            this.engineExecSrv.shutdownNow();
        }
    }

    protected void waitForEngineToStart() {
        Awaitility.await().alias("Engine haven't started on time").pollInterval(10L, TimeUnit.MILLISECONDS).atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(isEngineRunning.get());
        });
    }

    protected void waitForEngineToStop() {
        Awaitility.await().alias("Engine haven't stopped on time").pollInterval(10L, TimeUnit.MILLISECONDS).atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!isEngineRunning.get());
        });
    }

    protected void waitForTasksToStart(int i) {
        Awaitility.await().alias("Engine haven't started on time").pollInterval(10L, TimeUnit.MILLISECONDS).atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(runningTasks.get() >= i);
        });
    }

    protected void appendLinesToSource(int i) throws IOException {
        this.linesAdded += DebeziumEngineTestUtils.appendLinesToSource(this.inputFile, i, this.linesAdded);
    }
}
