package io.debezium.embedded;

import io.debezium.config.Configuration;
import io.debezium.config.Instantiator;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.DebeziumEngine;
import io.debezium.function.BooleanConsumer;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.TestLogger;
import io.debezium.pipeline.txmetadata.TransactionStatus;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/embedded/AbstractConnectorTest.class */
public abstract class AbstractConnectorTest implements Testing {
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("file-connector-offsets.txt").toAbsolutePath();
    private static final String TEST_PROPERTY_PREFIX = "debezium.test.";
    private ExecutorService executor;
    protected EmbeddedEngine engine;
    protected BlockingQueue<SourceRecord> consumedLines;
    private CountDownLatch latch;

    @Rule
    public TestRule skipTestRule = new SkipTestRule();
    protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(10);
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private JsonConverter keyJsonConverter = new JsonConverter();
    private JsonConverter valueJsonConverter = new JsonConverter();
    private JsonDeserializer keyJsonDeserializer = new JsonDeserializer();
    private JsonDeserializer valueJsonDeserializer = new JsonDeserializer();
    private boolean skipAvroValidation = false;

    @Rule
    public TestRule logTestName = new TestLogger(this.logger);

    /* loaded from: input_file:io/debezium/embedded/AbstractConnectorTest$SourceRecords.class */
    protected class SourceRecords {
        private final List<SourceRecord> records = new ArrayList();
        private final Map<String, List<SourceRecord>> recordsByTopic = new HashMap();
        private final Map<String, List<SourceRecord>> ddlRecordsByDbName = new HashMap();

        protected SourceRecords() {
        }

        public void add(SourceRecord sourceRecord) {
            this.records.add(sourceRecord);
            this.recordsByTopic.computeIfAbsent(sourceRecord.topic(), str -> {
                return new ArrayList();
            }).add(sourceRecord);
            String affectedDatabase = getAffectedDatabase(sourceRecord);
            if (affectedDatabase != null) {
                this.ddlRecordsByDbName.computeIfAbsent(affectedDatabase, str2 -> {
                    return new ArrayList();
                }).add(sourceRecord);
            }
        }

        protected String getAffectedDatabase(SourceRecord sourceRecord) {
            Field field;
            Struct struct = (Struct) sourceRecord.value();
            if (struct == null || (field = struct.schema().field("databaseName")) == null) {
                return null;
            }
            return struct.getString(field.name());
        }

        public List<SourceRecord> ddlRecordsForDatabase(String str) {
            return this.ddlRecordsByDbName.get(str);
        }

        public Set<String> databaseNames() {
            return this.ddlRecordsByDbName.keySet();
        }

        public List<SourceRecord> recordsForTopic(String str) {
            return this.recordsByTopic.get(str);
        }

        public Set<String> topics() {
            return this.recordsByTopic.keySet();
        }

        public void forEachInTopic(String str, Consumer<SourceRecord> consumer) {
            recordsForTopic(str).forEach(consumer);
        }

        public void forEach(Consumer<SourceRecord> consumer) {
            this.records.forEach(consumer);
        }

        public List<SourceRecord> allRecordsInOrder() {
            return Collections.unmodifiableList(this.records);
        }

        public void print() {
            Testing.print(topics().size() + " topics: " + topics());
            this.recordsByTopic.forEach((str, list) -> {
                Testing.print(" - topic:'" + str + "'; # of events = " + list.size());
            });
            Testing.print("Records:");
            List<SourceRecord> list2 = this.records;
            AbstractConnectorTest abstractConnectorTest = AbstractConnectorTest.this;
            list2.forEach(abstractConnectorTest::print);
        }
    }

    @Before
    public final void initializeConnectorTestFramework() {
        LoggingContext.forConnector(getClass().getSimpleName(), "", "test");
        this.keyJsonConverter = new JsonConverter();
        this.valueJsonConverter = new JsonConverter();
        this.keyJsonDeserializer = new JsonDeserializer();
        this.valueJsonDeserializer = new JsonDeserializer();
        Configuration build = Configuration.create().build();
        Configuration build2 = Configuration.create().build();
        this.keyJsonConverter.configure(build.asMap(), true);
        this.valueJsonConverter.configure(build.asMap(), false);
        this.keyJsonDeserializer.configure(build2.asMap(), true);
        this.valueJsonDeserializer.configure(build2.asMap(), false);
        resetBeforeEachTest();
        this.consumedLines = new ArrayBlockingQueue(getMaximumEnqueuedRecordCount());
        Testing.Files.delete(OFFSET_STORE_PATH);
        OFFSET_STORE_PATH.getParent().toFile().mkdirs();
    }

    @After
    public final void stopConnector() {
        stopConnector(null);
    }

    public void stopConnector(BooleanConsumer booleanConsumer) {
        try {
            this.logger.info("Stopping the connector");
            if (this.engine != null && this.engine.isRunning()) {
                this.logger.info("Stopping the engine");
                this.engine.stop();
                try {
                    this.engine.await(60L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    this.logger.warn("Engine has not stopped on time");
                    Thread.currentThread().interrupt();
                }
            }
            if (this.executor != null) {
                this.logger.info("Interrupting the engine");
                Assertions.assertThat(this.executor.shutdownNow()).isEmpty();
                do {
                    try {
                    } catch (InterruptedException e2) {
                        this.logger.warn("Executor has not stopped on time");
                        Thread.currentThread().interrupt();
                    }
                } while (!this.executor.awaitTermination(60L, TimeUnit.SECONDS));
            }
            if (this.engine != null && this.engine.isRunning()) {
                this.logger.info("Waiting for engine to stop");
                do {
                    try {
                    } catch (InterruptedException e3) {
                        this.logger.warn("Connector has not stopped on time");
                        Thread.currentThread().interrupt();
                    }
                } while (!this.engine.await(60L, TimeUnit.SECONDS));
            }
            if (booleanConsumer != null) {
                booleanConsumer.accept(this.engine != null && this.engine.isRunning());
            }
        } finally {
            this.engine = null;
            this.executor = null;
        }
    }

    protected int getMaximumEnqueuedRecordCount() {
        return 100;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EmbeddedEngine.CompletionCallback loggingCompletion() {
        return (z, str, th) -> {
            if (z) {
                this.logger.info(str);
            } else {
                this.logger.error(str, th);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start(Class<? extends SourceConnector> cls, Configuration configuration) {
        start(cls, configuration, loggingCompletion(), null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startAndConsumeTillEnd(Class<? extends SourceConnector> cls, Configuration configuration) {
        start(cls, configuration, loggingCompletion(), null, sourceRecord -> {
        }, false);
    }

    protected void start(Class<? extends SourceConnector> cls, Configuration configuration, Predicate<SourceRecord> predicate) {
        start(cls, configuration, loggingCompletion(), predicate);
    }

    protected void startAndConsumeTillEnd(Class<? extends SourceConnector> cls, Configuration configuration, Predicate<SourceRecord> predicate) {
        start(cls, configuration, loggingCompletion(), predicate, sourceRecord -> {
        }, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start(Class<? extends SourceConnector> cls, Configuration configuration, DebeziumEngine.CompletionCallback completionCallback) {
        start(cls, configuration, completionCallback, null);
    }

    protected void start(Class<? extends SourceConnector> cls, Configuration configuration, DebeziumEngine.CompletionCallback completionCallback, Predicate<SourceRecord> predicate) {
        start(cls, configuration, completionCallback, predicate, sourceRecord -> {
        }, true);
    }

    protected void start(Class<? extends SourceConnector> cls, Configuration configuration, DebeziumEngine.ChangeConsumer changeConsumer) {
        start(cls, configuration, loggingCompletion(), null, sourceRecord -> {
        }, true, changeConsumer);
    }

    protected void start(Class<? extends SourceConnector> cls, Configuration configuration, DebeziumEngine.CompletionCallback completionCallback, Predicate<SourceRecord> predicate, Consumer<SourceRecord> consumer, boolean z) {
        start(cls, configuration, completionCallback, predicate, consumer, z, null);
    }

    protected void start(Class<? extends SourceConnector> cls, Configuration configuration, DebeziumEngine.CompletionCallback completionCallback, Predicate<SourceRecord> predicate, Consumer<SourceRecord> consumer, boolean z, DebeziumEngine.ChangeConsumer changeConsumer) {
        Configuration build = Configuration.copy(configuration).with(EmbeddedEngine.ENGINE_NAME, "testing-connector").with(EmbeddedEngine.CONNECTOR_CLASS, cls.getName()).with("offset.storage.file.filename", OFFSET_STORE_PATH).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0).build();
        this.latch = new CountDownLatch(1);
        EmbeddedEngine.CompletionCallback completionCallback2 = (z2, str, th) -> {
            if (completionCallback != null) {
                try {
                    completionCallback.handle(z2, str, th);
                } finally {
                    if (!z2) {
                        this.latch.countDown();
                    }
                }
            }
            Testing.debug("Stopped connector");
        };
        EmbeddedEngine.ConnectorCallback connectorCallback = new EmbeddedEngine.ConnectorCallback() { // from class: io.debezium.embedded.AbstractConnectorTest.1
            public void taskStarted() {
                AbstractConnectorTest.this.latch.countDown();
            }
        };
        EmbeddedEngine.Builder create = EmbeddedEngine.create();
        create.using(build).notifying(getConsumer(predicate, consumer, z)).using(getClass().getClassLoader()).using(completionCallback2).using(connectorCallback);
        if (changeConsumer != null) {
            create.notifying(changeConsumer);
        }
        this.engine = create.build();
        Assertions.assertThat(this.executor).isNull();
        this.executor = Executors.newFixedThreadPool(1);
        this.executor.execute(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        try {
            if (!this.latch.await(5L, TimeUnit.MINUTES)) {
                this.logger.warn("The connector did not finish starting its task(s) or complete in the expected amount of time");
            }
            waitForNotInitialState();
        } catch (InterruptedException e) {
            if (Thread.interrupted()) {
                Assert.fail("Interrupted while waiting for engine startup");
            }
        }
    }

    protected void waitForNotInitialState() {
        this.engine.runWithTask(sourceTask -> {
            if (sourceTask instanceof BaseSourceTask) {
                BaseSourceTask baseSourceTask = (BaseSourceTask) sourceTask;
                Awaitility.await().alias("Task has attempted to initialize coordinator").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).until(() -> {
                    return Boolean.valueOf(baseSourceTask.getTaskState() != BaseSourceTask.State.INITIAL);
                });
            }
        });
    }

    protected Consumer<SourceRecord> getConsumer(Predicate<SourceRecord> predicate, Consumer<SourceRecord> consumer, boolean z) {
        return sourceRecord -> {
            if (predicate != null && predicate.test(sourceRecord)) {
                this.logger.error("Stopping connector after record as requested");
                throw new ConnectException("Stopping connector after record as requested");
            }
            if (!z || (this.engine.isRunning() && !Thread.currentThread().isInterrupted())) {
                while (!this.consumedLines.offer(sourceRecord)) {
                    if (z && (!this.engine.isRunning() || Thread.currentThread().isInterrupted())) {
                        return;
                    }
                }
                consumer.accept(sourceRecord);
            }
        };
    }

    protected void setConsumeTimeout(long j, TimeUnit timeUnit) {
        if (j < 0) {
            throw new IllegalArgumentException("The timeout may not be negative");
        }
        this.pollTimeoutInMs = timeUnit.toMillis(j);
    }

    protected SourceRecord consumeRecord() throws InterruptedException {
        return this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
    }

    protected int consumeRecords(int i) throws InterruptedException {
        return consumeRecords(i, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int consumeRecords(int i, int i2, Consumer<SourceRecord> consumer, boolean z) throws InterruptedException {
        return consumeRecordsUntil((num, sourceRecord) -> {
            return num.intValue() >= i;
        }, (num2, sourceRecord2) -> {
            return "Consumed record " + num2 + " / " + i + " (" + (i - num2.intValue()) + " more)";
        }, i2, consumer, z);
    }

    protected int consumeRecordsUntil(BiPredicate<Integer, SourceRecord> biPredicate, BiFunction<Integer, SourceRecord, String> biFunction, int i, Consumer<SourceRecord> consumer, boolean z) throws InterruptedException {
        int i2 = 0;
        int i3 = 0;
        boolean z2 = false;
        while (!z2 && this.engine.isRunning()) {
            SourceRecord poll = this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
            if (poll != null) {
                i3 = 0;
                i2++;
                if (consumer != null) {
                    consumer.accept(poll);
                }
                if (Testing.Debug.isEnabled()) {
                    Testing.debug(biFunction.apply(Integer.valueOf(i2), poll));
                    debug(poll);
                } else if (Testing.Print.isEnabled()) {
                    Testing.print(biFunction.apply(Integer.valueOf(i2), poll));
                    print(poll);
                }
                if (z) {
                    VerifyRecord.isValid(poll, this.skipAvroValidation);
                }
                z2 = biPredicate.test(Integer.valueOf(i2), poll);
            } else {
                i3++;
                if (i3 >= i) {
                    return i2;
                }
            }
        }
        return i2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int consumeRecords(int i, Consumer<SourceRecord> consumer) throws InterruptedException {
        return consumeRecords(i, waitTimeForRecordsAfterNulls(), consumer, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceRecords consumeRecordsByTopic(int i, int i2) throws InterruptedException {
        SourceRecords sourceRecords = new SourceRecords();
        Objects.requireNonNull(sourceRecords);
        consumeRecords(i, i2, sourceRecords::add, true);
        return sourceRecords;
    }

    protected SourceRecords consumeAvailableRecordsByTopic() throws InterruptedException {
        SourceRecords sourceRecords = new SourceRecords();
        Objects.requireNonNull(sourceRecords);
        consumeAvailableRecords(sourceRecords::add);
        return sourceRecords;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceRecords consumeRecordsByTopic(int i) throws InterruptedException {
        SourceRecords sourceRecords = new SourceRecords();
        Objects.requireNonNull(sourceRecords);
        consumeRecords(i, sourceRecords::add);
        return sourceRecords;
    }

    protected SourceRecords consumeRecordsButSkipUntil(int i, BiPredicate<Struct, Struct> biPredicate) throws InterruptedException {
        SourceRecords sourceRecords = new SourceRecords();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        consumeRecords(i, sourceRecord -> {
            if (atomicBoolean.get()) {
                if (biPredicate.test((Struct) sourceRecord.key(), (Struct) sourceRecord.value())) {
                    atomicBoolean.set(false);
                } else {
                    Testing.print("Skipped record");
                    print(sourceRecord);
                    Testing.debug("Skipped record");
                    debug(sourceRecord);
                }
            }
            if (atomicBoolean.get()) {
                return;
            }
            sourceRecords.add(sourceRecord);
        });
        int size = i - sourceRecords.allRecordsInOrder().size();
        if (size > 0) {
            Objects.requireNonNull(sourceRecords);
            consumeRecords(size, sourceRecords::add);
        }
        return sourceRecords;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceRecords consumeRecordsByTopicUntil(BiPredicate<Integer, SourceRecord> biPredicate) throws InterruptedException {
        SourceRecords sourceRecords = new SourceRecords();
        BiFunction<Integer, SourceRecord, String> biFunction = (num, sourceRecord) -> {
            return "Consumed " + (biPredicate.test(num, sourceRecord) ? "last " : "") + "record " + num;
        };
        int waitTimeForRecordsAfterNulls = waitTimeForRecordsAfterNulls();
        Objects.requireNonNull(sourceRecords);
        consumeRecordsUntil(biPredicate, biFunction, waitTimeForRecordsAfterNulls, sourceRecords::add, true);
        return sourceRecords;
    }

    protected SourceRecords consumeRecordsByTopic(int i, boolean z) throws InterruptedException {
        SourceRecords sourceRecords = new SourceRecords();
        int waitTimeForRecordsAfterNulls = waitTimeForRecordsAfterNulls();
        Objects.requireNonNull(sourceRecords);
        consumeRecords(i, waitTimeForRecordsAfterNulls, sourceRecords::add, z);
        return sourceRecords;
    }

    protected SourceRecords consumeDmlRecordsByTopic(int i) throws InterruptedException {
        SourceRecords sourceRecords = new SourceRecords();
        Objects.requireNonNull(sourceRecords);
        consumeDmlRecordsByTopic(i, sourceRecords::add);
        return sourceRecords;
    }

    protected int consumeDmlRecordsByTopic(int i, Consumer<SourceRecord> consumer) throws InterruptedException {
        return consumeDmlRecordsByTopic(i, waitTimeForRecordsAfterNulls(), consumer, true);
    }

    protected int consumeDmlRecordsByTopic(int i, int i2, Consumer<SourceRecord> consumer, boolean z) throws InterruptedException {
        int i3 = 0;
        int i4 = 0;
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        while (i3 < i) {
            SourceRecord poll = this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
            if (poll != null) {
                i4 = 0;
                Struct struct = (Struct) poll.value();
                if (isTransactionRecord(poll)) {
                    String string = struct.getString("status");
                    String str = (String) Arrays.stream(struct.getString("id").split(":")).findFirst().get();
                    if (string.equals(TransactionStatus.BEGIN.name())) {
                        linkedHashSet.add(str);
                    } else {
                        linkedHashSet.remove(str);
                    }
                } else {
                    String l = struct.getStruct("source").getInt64("txId").toString();
                    Assertions.assertThat(linkedHashSet.contains(l)).as("DML record txId " + l + " not in open transaction set", new Object[0]).isTrue();
                    i3++;
                }
                if (consumer != null) {
                    consumer.accept(poll);
                }
                if (Testing.Debug.isEnabled()) {
                    Testing.debug("Consumed record " + i3 + " / " + i + " (" + (i - i3) + " more), " + linkedHashSet.size() + " active transactions");
                    debug(poll);
                } else if (Testing.Print.isEnabled()) {
                    Testing.print("Consumed record " + i3 + " / " + i + " (" + (i - i3) + " more), " + linkedHashSet.size() + " active transactions");
                    print(poll);
                }
                if (z) {
                    VerifyRecord.isValid(poll);
                }
            } else {
                i4++;
                if (i4 >= i2) {
                    return i3;
                }
            }
        }
        while (!linkedHashSet.isEmpty()) {
            SourceRecord poll2 = this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
            if (poll2 != null) {
                i4 = 0;
                Struct struct2 = (Struct) poll2.value();
                if (!isTransactionRecord(poll2)) {
                    String l2 = struct2.getStruct("source").getInt64("txId").toString();
                    Assertions.assertThat(linkedHashSet.contains(l2)).as("DML record txId " + l2 + " not in open transaction set", new Object[0]).isTrue();
                    i3++;
                } else if (struct2.getString("status").equals(TransactionStatus.END.name())) {
                    linkedHashSet.remove(struct2.getString("id"));
                } else {
                    linkedHashSet.add(struct2.getString("id"));
                }
                if (consumer != null) {
                    consumer.accept(poll2);
                }
                if (Testing.Debug.isEnabled()) {
                    Testing.debug("Consumed record " + i3 + " / " + i + " (" + (i - i3) + " more), " + linkedHashSet.size() + " active transactions");
                    debug(poll2);
                } else if (Testing.Print.isEnabled()) {
                    Testing.print("Consumed record " + i3 + " / " + i + " (" + (i - i3) + " more), " + linkedHashSet.size() + " active transactions");
                    print(poll2);
                }
                if (z) {
                    VerifyRecord.isValid(poll2);
                }
            } else {
                i4++;
                if (i4 >= i2) {
                    return i3;
                }
            }
        }
        return i3;
    }

    protected boolean isTransactionRecord(SourceRecord sourceRecord) {
        return sourceRecord != null && sourceRecord.topic().endsWith(".transaction") && sourceRecord.keySchema().name().equals("io.debezium.connector.common.TransactionMetadataKey");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int consumeAvailableRecords(Consumer<SourceRecord> consumer) {
        LinkedList linkedList = new LinkedList();
        this.consumedLines.drainTo(linkedList);
        if (consumer != null) {
            linkedList.forEach(consumer);
        }
        return linkedList.size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean waitForAvailableRecords(long j, TimeUnit timeUnit) {
        Assertions.assertThat(j).isGreaterThanOrEqualTo(0L);
        long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(j);
        while (System.currentTimeMillis() < currentTimeMillis && this.consumedLines.isEmpty()) {
        }
        return !this.consumedLines.isEmpty();
    }

    protected void skipAvroValidation() {
        this.skipAvroValidation = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertConnectorIsRunning() {
        Assertions.assertThat(this.engine.isRunning()).isTrue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertConnectorNotRunning() {
        Assertions.assertThat(this.engine != null && this.engine.isRunning()).isFalse();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertNoRecordsToConsume() {
        Assertions.assertThat(this.consumedLines.isEmpty()).isTrue();
    }

    protected void assertOnlyTransactionRecordsToConsume() {
        this.consumedLines.iterator().forEachRemaining(sourceRecord -> {
            Assertions.assertThat(isTransactionRecord(sourceRecord)).isTrue();
        });
    }

    protected void assertKey(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.hasValidKey(sourceRecord, str, i);
    }

    protected void assertInsert(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.isValidInsert(sourceRecord, str, i);
    }

    protected void assertUpdate(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.isValidUpdate(sourceRecord, str, i);
    }

    protected void assertDelete(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.isValidDelete(sourceRecord, str, i);
    }

    protected void assertSourceQuery(SourceRecord sourceRecord, String str) {
        VerifyRecord.hasValidSourceQuery(sourceRecord, str);
    }

    protected void assertHasNoSourceQuery(SourceRecord sourceRecord) {
        VerifyRecord.hasNoSourceQuery(sourceRecord);
    }

    protected void assertTombstone(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.isValidTombstone(sourceRecord, str, i);
    }

    protected void assertTombstone(SourceRecord sourceRecord) {
        VerifyRecord.isValidTombstone(sourceRecord);
    }

    protected void assertOffset(SourceRecord sourceRecord, Map<String, ?> map) {
        Assertions.assertThat(sourceRecord.sourceOffset()).isEqualTo(map);
    }

    protected void assertOffset(SourceRecord sourceRecord, String str, Object obj) {
        assertSameValue(sourceRecord.sourceOffset().get(str), obj);
    }

    protected void assertValueField(SourceRecord sourceRecord, String str, Object obj) {
        VerifyRecord.assertValueField(sourceRecord, str, obj);
    }

    private void assertSameValue(Object obj, Object obj2) {
        VerifyRecord.assertSameValue(obj, obj2);
    }

    protected void assertSchemaMatchesStruct(SchemaAndValue schemaAndValue) {
        VerifyRecord.schemaMatchesStruct(schemaAndValue);
    }

    protected void assertSchemaMatchesStruct(Struct struct, Schema schema) {
        VerifyRecord.schemaMatchesStruct(struct, schema);
    }

    protected void assertEngineIsRunning() {
        Assertions.assertThat(this.engine.isRunning()).as("Engine should not fail due to an exception", new Object[0]).isTrue();
    }

    protected void validate(SourceRecord sourceRecord) {
        VerifyRecord.isValid(sourceRecord);
    }

    protected void print(SourceRecord sourceRecord) {
        VerifyRecord.print(sourceRecord);
    }

    protected void debug(SourceRecord sourceRecord) {
        VerifyRecord.debug(sourceRecord);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int i) {
        Assertions.assertThat(configValue(config, field.name()).errorMessages().size()).isEqualTo(i);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int i, int i2) {
        ConfigValue configValue = configValue(config, field.name());
        Assertions.assertThat(configValue.errorMessages().size()).isGreaterThanOrEqualTo(i);
        Assertions.assertThat(configValue.errorMessages().size()).isLessThanOrEqualTo(i2);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field) {
        Assertions.assertThat(configValue(config, field.name()).errorMessages().size()).isGreaterThan(0);
    }

    protected void assertNoConfigurationErrors(Config config, io.debezium.config.Field... fieldArr) {
        for (io.debezium.config.Field field : fieldArr) {
            ConfigValue configValue = configValue(config, field.name());
            if (configValue != null && !configValue.errorMessages().isEmpty()) {
                Assert.fail("Error messages on field '" + field.name() + "': " + configValue.errorMessages());
            }
        }
    }

    protected ConfigValue configValue(Config config, String str) {
        return (ConfigValue) config.configValues().stream().filter(configValue -> {
            return configValue.name().equals(str);
        }).findFirst().orElse(null);
    }

    protected <T> Map<String, Object> readLastCommittedOffset(Configuration configuration, Map<String, T> map) {
        return readLastCommittedOffsets(configuration, Arrays.asList(map)).get(map);
    }

    protected <T> Map<Map<String, T>, Map<String, Object>> readLastCommittedOffsets(Configuration configuration, Collection<Map<String, T>> collection) {
        Configuration build = configuration.edit().with(EmbeddedEngine.ENGINE_NAME, "testing-connector").with("offset.storage.file.filename", OFFSET_STORE_PATH).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0).build();
        String string = build.getString(EmbeddedEngine.ENGINE_NAME);
        Map singletonMap = Collections.singletonMap("schemas.enable", "false");
        Converter converter = (Converter) Instantiator.getInstance(JsonConverter.class.getName());
        converter.configure(singletonMap, true);
        Converter converter2 = (Converter) Instantiator.getInstance(JsonConverter.class.getName());
        converter2.configure(singletonMap, false);
        Map asMap = build.asMap(EmbeddedEngine.ALL_FIELDS);
        asMap.put("key.converter", JsonConverter.class.getName());
        asMap.put("value.converter", JsonConverter.class.getName());
        EmbeddedEngine.EmbeddedConfig embeddedConfig = new EmbeddedEngine.EmbeddedConfig(asMap);
        FileOffsetBackingStore fileOffsetBackingStore = KafkaConnectUtil.fileOffsetBackingStore();
        fileOffsetBackingStore.configure(embeddedConfig);
        fileOffsetBackingStore.start();
        try {
            Map<Map<String, T>, Map<String, Object>> offsets = new OffsetStorageReaderImpl(fileOffsetBackingStore, string, converter, converter2).offsets(collection);
            fileOffsetBackingStore.stop();
            return offsets;
        } catch (Throwable th) {
            fileOffsetBackingStore.stop();
            throw th;
        }
    }

    protected String assertBeginTransaction(SourceRecord sourceRecord) {
        Struct struct = (Struct) sourceRecord.value();
        Struct struct2 = (Struct) sourceRecord.key();
        Map sourceOffset = sourceRecord.sourceOffset();
        Assertions.assertThat(struct.getString("status")).isEqualTo("BEGIN");
        Assertions.assertThat(struct.getInt64("event_count")).isNull();
        String string = struct.getString("id");
        Assertions.assertThat(struct2.getString("id")).isEqualTo(string);
        Assertions.assertThat(sourceOffset.get("transaction_id")).isEqualTo(string);
        return string;
    }

    protected void assertEndTransaction(SourceRecord sourceRecord, String str, long j, Map<String, Number> map) {
        Struct struct = (Struct) sourceRecord.value();
        Struct struct2 = (Struct) sourceRecord.key();
        Map sourceOffset = sourceRecord.sourceOffset();
        Assertions.assertThat(struct.getString("status")).isEqualTo("END");
        Assertions.assertThat(struct.getString("id")).isEqualTo(str);
        Assertions.assertThat(struct.getInt64("event_count")).isEqualTo(j);
        Assertions.assertThat(struct2.getString("id")).isEqualTo(str);
        Assertions.assertThat((Map) struct.getArray("data_collections").stream().map(obj -> {
            return (Struct) obj;
        }).collect(Collectors.toMap(struct3 -> {
            return struct3.getString("data_collection");
        }, struct4 -> {
            return struct4.getInt64("event_count");
        }))).isEqualTo(map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return (String) entry.getKey();
        }, entry2 -> {
            return Long.valueOf(((Number) entry2.getValue()).longValue());
        })));
        Assertions.assertThat(sourceOffset.get("transaction_id")).isEqualTo(str);
    }

    protected void assertRecordTransactionMetadata(SourceRecord sourceRecord, String str, long j, long j2) {
        Struct struct = ((Struct) sourceRecord.value()).getStruct("transaction");
        Map sourceOffset = sourceRecord.sourceOffset();
        Assertions.assertThat(struct.getString("id")).isEqualTo(str);
        Assertions.assertThat(struct.getInt64("total_order")).isEqualTo(j);
        Assertions.assertThat(struct.getInt64("data_collection_order")).isEqualTo(j2);
        Assertions.assertThat(sourceOffset.get("transaction_id")).isEqualTo(str);
    }

    public static int waitTimeForRecords() {
        return Integer.parseInt(System.getProperty("debezium.test.records.waittime", "2"));
    }

    public static int waitTimeForRecordsAfterNulls() {
        return Integer.parseInt(System.getProperty("debezium.test.records.waittime.after.nulls", "3"));
    }

    public static void waitForSnapshotToBeCompleted(String str, String str2) throws InterruptedException {
        waitForSnapshotEvent(str, str2, "SnapshotCompleted", null, null);
    }

    public static void waitForSnapshotToBeCompleted(String str, String str2, String str3, String str4) throws InterruptedException {
        waitForSnapshotEvent(str, str2, "SnapshotCompleted", str3, str4);
    }

    public static void waitForSnapshotWithCustomMetricsToBeCompleted(String str, String str2, Map<String, String> map) throws InterruptedException {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        Awaitility.await().alias("Streaming was not started on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            return Boolean.valueOf(((Boolean) platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(str, str2, map), "SnapshotCompleted")).booleanValue());
        });
    }

    private static void waitForSnapshotEvent(String str, String str2, String str3, String str4, String str5) throws InterruptedException {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        Awaitility.await().alias("Streaming was not started on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            return Boolean.valueOf(((Boolean) platformMBeanServer.getAttribute(getSnapshotMetricsObjectName(str, str2, str4, str5), str3)).booleanValue());
        });
    }

    public static void waitForStreamingRunning(String str, String str2) throws InterruptedException {
        waitForStreamingRunning(str, str2, getStreamingNamespace());
    }

    public static void waitForStreamingRunning(String str, String str2, String str3) {
        waitForStreamingRunning(str, str2, str3, null);
    }

    public static void waitForStreamingRunning(String str, String str2, String str3, String str4) {
        Awaitility.await().alias("Streaming was not started on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            return Boolean.valueOf(isStreamingRunning(str, str2, str3, str4));
        });
    }

    public static void waitForStreamingWithCustomMetricsToStart(String str, String str2, Map<String, String> map) {
        Awaitility.await().alias("Streaming was not started on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            return Boolean.valueOf(isStreamingRunning(str, str2, (Map<String, String>) map));
        });
    }

    public static void waitForConnectorShutdown(String str, String str2) {
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!isStreamingRunning(str, str2));
        });
    }

    public static boolean isStreamingRunning(String str, String str2) {
        return isStreamingRunning(str, str2, getStreamingNamespace(), null);
    }

    public static boolean isStreamingRunning(String str, String str2, String str3) {
        return isStreamingRunning(str, str2, str3, null);
    }

    public static boolean isStreamingRunning(String str, String str2, String str3, String str4) {
        try {
            return ((Boolean) ManagementFactory.getPlatformMBeanServer().getAttribute(str4 != null ? getStreamingMetricsObjectName(str, str2, str3, str4) : getStreamingMetricsObjectName(str, str2, str3), "Connected")).booleanValue();
        } catch (JMException e) {
            return false;
        }
    }

    public static boolean isStreamingRunning(String str, String str2, Map<String, String> map) {
        try {
            return ((Boolean) ManagementFactory.getPlatformMBeanServer().getAttribute(getStreamingMetricsObjectName(str, str2, map), "Connected")).booleanValue();
        } catch (JMException e) {
            return false;
        }
    }

    public static ObjectName getSnapshotMetricsObjectName(String str, String str2) throws MalformedObjectNameException {
        return new ObjectName("debezium." + str + ":type=connector-metrics,context=snapshot,server=" + str2);
    }

    public static ObjectName getSnapshotMetricsObjectName(String str, String str2, String str3, String str4) throws MalformedObjectNameException {
        HashMap hashMap = new HashMap();
        hashMap.put("task", str3);
        hashMap.put("database", str4);
        return getSnapshotMetricsObjectName(str, str2, hashMap);
    }

    public static ObjectName getSnapshotMetricsObjectName(String str, String str2, Map<String, String> map) throws MalformedObjectNameException {
        String str3 = (String) map.entrySet().stream().filter(entry -> {
            return entry.getValue() != null;
        }).map(entry2 -> {
            return String.format("%s=%s", entry2.getKey(), entry2.getValue());
        }).collect(Collectors.joining(","));
        return str3.length() != 0 ? new ObjectName("debezium." + str + ":type=connector-metrics,context=snapshot,server=" + str2 + "," + str3) : getSnapshotMetricsObjectName(str, str2);
    }

    public static ObjectName getStreamingMetricsObjectName(String str, String str2) throws MalformedObjectNameException {
        return getStreamingMetricsObjectName(str, str2, getStreamingNamespace());
    }

    public static ObjectName getStreamingMetricsObjectName(String str, String str2, String str3) throws MalformedObjectNameException {
        return new ObjectName("debezium." + str + ":type=connector-metrics,context=" + str3 + ",server=" + str2);
    }

    public static ObjectName getStreamingMetricsObjectName(String str, String str2, String str3, String str4) throws MalformedObjectNameException {
        return new ObjectName("debezium." + str + ":type=connector-metrics,context=" + str3 + ",server=" + str2 + ",task=" + str4);
    }

    public static ObjectName getStreamingMetricsObjectName(String str, String str2, Map<String, String> map) throws MalformedObjectNameException {
        String str3 = (String) map.entrySet().stream().filter(entry -> {
            return entry.getValue() != null;
        }).map(entry2 -> {
            return String.format("%s=%s", entry2.getKey(), entry2.getValue());
        }).collect(Collectors.joining(","));
        return str3.length() != 0 ? new ObjectName("debezium." + str + ":type=connector-metrics,context=" + getStreamingNamespace() + ",server=" + str2 + "," + str3) : getStreamingMetricsObjectName(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getStreamingNamespace() {
        return System.getProperty("test.streaming.metrics.namespace", "streaming");
    }
}
