package io.debezium.embedded;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.config.Instantiator;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.VariableLatch;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/debezium/embedded/EmbeddedEngine.class */
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>, EmbeddedEngineConfig {
    private static final Logger LOGGER;
    private final Configuration config;
    private final Clock clock;
    private final ClassLoader classLoader;
    private final DebeziumEngine.ChangeConsumer<SourceRecord> handler;
    private final DebeziumEngine.CompletionCallback completionCallback;
    private final DebeziumEngine.ConnectorCallback connectorCallback;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final WorkerConfig workerConfig;
    private final CompletionResult completionResult;
    private OffsetCommitPolicy offsetCommitPolicy;
    private SourceTask task;
    private final Transformations transformations;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicReference<Thread> runningThread = new AtomicReference<>();
    private final VariableLatch latch = new VariableLatch(0);
    private long recordsSinceLastCommit = 0;
    private long timeOfLastCommitMillis = 0;

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$CompletionResult.class */
    public static class CompletionResult implements DebeziumEngine.CompletionCallback {
        private final DebeziumEngine.CompletionCallback delegate;
        private final CountDownLatch completed;
        private boolean success;
        private String message;
        private Throwable error;

        public CompletionResult() {
            this(null);
        }

        public CompletionResult(DebeziumEngine.CompletionCallback completionCallback) {
            this.completed = new CountDownLatch(1);
            this.delegate = completionCallback;
        }

        public void handle(boolean z, String str, Throwable th) {
            this.success = z;
            this.message = str;
            this.error = th;
            this.completed.countDown();
            if (this.delegate != null) {
                this.delegate.handle(z, str, th);
            }
        }

        public void await() throws InterruptedException {
            this.completed.await();
        }

        public boolean await(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.completed.await(j, timeUnit);
        }

        public boolean hasCompleted() {
            return this.completed.getCount() == 0;
        }

        public boolean success() {
            return this.success;
        }

        public String message() {
            return this.message;
        }

        public Throwable error() {
            return this.error;
        }

        public boolean hasError() {
            return this.error != null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$EmbeddedEngineRuntimeException.class */
    public static class EmbeddedEngineRuntimeException extends RuntimeException {
        EmbeddedEngineRuntimeException() {
        }
    }

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$EngineBuilder.class */
    public static final class EngineBuilder implements DebeziumEngine.Builder<SourceRecord> {
        private Configuration config;
        private DebeziumEngine.ChangeConsumer<SourceRecord> handler;
        private ClassLoader classLoader;
        private Clock clock;
        private DebeziumEngine.CompletionCallback completionCallback;
        private DebeziumEngine.ConnectorCallback connectorCallback;
        private OffsetCommitPolicy offsetCommitPolicy = null;

        public DebeziumEngine.Builder using(Properties properties) {
            this.config = Configuration.from(properties);
            return this;
        }

        public DebeziumEngine.Builder using(ClassLoader classLoader) {
            this.classLoader = classLoader;
            return this;
        }

        public DebeziumEngine.Builder using(DebeziumEngine.CompletionCallback completionCallback) {
            this.completionCallback = completionCallback;
            return this;
        }

        public DebeziumEngine.Builder using(DebeziumEngine.ConnectorCallback connectorCallback) {
            this.connectorCallback = connectorCallback;
            return this;
        }

        public DebeziumEngine.Builder using(OffsetCommitPolicy offsetCommitPolicy) {
            this.offsetCommitPolicy = offsetCommitPolicy;
            return this;
        }

        public DebeziumEngine.Builder notifying(Consumer<SourceRecord> consumer) {
            this.handler = EmbeddedEngine.buildDefaultChangeConsumer(consumer);
            return this;
        }

        public DebeziumEngine.Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> changeConsumer) {
            this.handler = changeConsumer;
            if (!this.config.hasKey(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !changeConsumer.supportsTombstoneEvents()) {
                EmbeddedEngine.LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", CommonConnectorConfig.TOMBSTONES_ON_DELETE.name());
                this.config = this.config.edit().with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build();
            }
            return this;
        }

        public DebeziumEngine.Builder using(final java.time.Clock clock) {
            this.clock = new Clock() { // from class: io.debezium.embedded.EmbeddedEngine.EngineBuilder.1
                public long currentTimeInMillis() {
                    return clock.millis();
                }
            };
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public EmbeddedEngine m1build() {
            if (this.classLoader == null) {
                this.classLoader = Instantiator.getClassLoader();
            }
            if (this.clock == null) {
                this.clock = Clock.system();
            }
            Objects.requireNonNull(this.config, "A connector configuration must be specified.");
            Objects.requireNonNull(this.handler, "A connector consumer or changeHandler must be specified.");
            return new EmbeddedEngine(this.config, this.classLoader, this.clock, this.handler, this.completionCallback, this.connectorCallback, this.offsetCommitPolicy);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$HandlerErrors.class */
    public class HandlerErrors {
        private Throwable handlerError;
        private Throwable retryError;

        HandlerErrors(Throwable th, Throwable th2) {
            this.handlerError = th;
            this.retryError = th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$SourceRecordOffsets.class */
    public class SourceRecordOffsets implements DebeziumEngine.Offsets {
        private HashMap<String, Object> offsets = new HashMap<>();

        protected SourceRecordOffsets() {
        }

        public void set(String str, Object obj) {
            this.offsets.put(str, obj);
        }

        protected HashMap<String, Object> getOffsets() {
            return this.offsets;
        }
    }

    private static DebeziumEngine.ChangeConsumer<SourceRecord> buildDefaultChangeConsumer(final Consumer<SourceRecord> consumer) {
        return new DebeziumEngine.ChangeConsumer<SourceRecord>() { // from class: io.debezium.embedded.EmbeddedEngine.1
            public void handleBatch(List<SourceRecord> list, DebeziumEngine.RecordCommitter<SourceRecord> recordCommitter) throws InterruptedException {
                for (SourceRecord sourceRecord : list) {
                    try {
                        consumer.accept(sourceRecord);
                        recordCommitter.markProcessed(sourceRecord);
                    } catch (StopEngineException e) {
                        recordCommitter.markProcessed(sourceRecord);
                        throw e;
                    }
                }
                recordCommitter.markBatchFinished();
            }
        };
    }

    private EmbeddedEngine(Configuration configuration, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> changeConsumer, DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy) {
        this.config = configuration;
        this.handler = changeConsumer;
        this.classLoader = classLoader;
        this.clock = clock;
        this.completionCallback = completionCallback != null ? completionCallback : (z, str, th) -> {
            if (z) {
                return;
            }
            LOGGER.error(str, th);
        };
        this.connectorCallback = connectorCallback;
        this.completionResult = new CompletionResult();
        this.offsetCommitPolicy = offsetCommitPolicy;
        if (!$assertionsDisabled && this.config == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.handler == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.classLoader == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.clock == null) {
            throw new AssertionError();
        }
        Map singletonMap = Collections.singletonMap("schemas.enable", "false");
        this.keyConverter = (Converter) Instantiator.getInstance(JsonConverter.class.getName());
        this.keyConverter.configure(singletonMap, true);
        this.valueConverter = (Converter) Instantiator.getInstance(JsonConverter.class.getName());
        this.valueConverter.configure(singletonMap, false);
        this.transformations = new Transformations(configuration);
        this.workerConfig = new EmbeddedWorkerConfig(configuration.asMap(EmbeddedEngineConfig.ALL_FIELDS));
    }

    public boolean isRunning() {
        return this.runningThread.get() != null;
    }

    private void fail(String str) {
        fail(str, null);
    }

    private void fail(String str, Throwable th) {
        if (this.completionResult.hasError()) {
            LOGGER.error(str, th);
        } else {
            this.completionResult.handle(false, str, th);
        }
    }

    private void failAndThrow(String str, Throwable th) throws EmbeddedEngineRuntimeException {
        fail(str, th);
        throw new EmbeddedEngineRuntimeException();
    }

    private void succeed(String str) {
        this.completionResult.handle(true, str, null);
    }

    /* JADX WARN: Finally extract failed */
    public void run() {
        if (this.runningThread.compareAndSet(null, Thread.currentThread())) {
            String string = this.config.getString(EmbeddedEngineConfig.ENGINE_NAME);
            String string2 = this.config.getString(EmbeddedEngineConfig.CONNECTOR_CLASS);
            Optional<DebeziumEngine.ConnectorCallback> ofNullable = Optional.ofNullable(this.connectorCallback);
            this.latch.countUp();
            try {
                try {
                    Configuration configuration = this.config;
                    Field.Set set = EmbeddedEngineConfig.CONNECTOR_FIELDS;
                    Logger logger = LOGGER;
                    Objects.requireNonNull(logger);
                    if (!configuration.validateAndRecord(set, logger::error)) {
                        failAndThrow("Failed to start connector with invalid configuration (see logs for actual errors)", null);
                    }
                    SourceConnector instantiateConnector = instantiateConnector(string2);
                    Map<String, String> connectorConfig = getConnectorConfig(instantiateConnector, string2);
                    OffsetBackingStore initializeOffsetStore = initializeOffsetStore(connectorConfig);
                    setOffsetCommitPolicy();
                    Duration ofMillis = Duration.ofMillis(this.config.getLong(EmbeddedEngineConfig.OFFSET_COMMIT_TIMEOUT_MS));
                    OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl(initializeOffsetStore, string, this.keyConverter, this.valueConverter);
                    OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(initializeOffsetStore, string, this.keyConverter, this.valueConverter);
                    initializeConnector(instantiateConnector, offsetStorageReaderImpl);
                    try {
                        try {
                            instantiateConnector.start(connectorConfig);
                            ofNullable.ifPresent((v0) -> {
                                v0.connectorStarted();
                            });
                            List<Map<String, String>> taskConfigs = instantiateConnector.taskConfigs(1);
                            Class<? extends Task> taskClass = instantiateConnector.taskClass();
                            this.task = createSourceTask(taskConfigs, taskClass);
                            try {
                                startSourceTask(taskConfigs, offsetStorageReaderImpl);
                                ofNullable.ifPresent((v0) -> {
                                    v0.taskStarted();
                                });
                            } catch (Throwable th) {
                                stopSourceTask();
                                failAndThrow("Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: " + Configuration.from(taskConfigs.get(0)).withMaskedPasswords(), th);
                            }
                            this.recordsSinceLastCommit = 0L;
                            HandlerErrors handlerErrors = new HandlerErrors(null, null);
                            try {
                                this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
                                pollRecords(taskConfigs, buildRecordCommitter(offsetStorageWriter, this.task, ofMillis), handlerErrors);
                                setCompletionResult(string2, handlerErrors);
                                stopTaskAndCommitOffset(offsetStorageWriter, ofMillis, ofNullable);
                                stopOffsetStoreAndConnector(instantiateConnector, string2, initializeOffsetStore, ofNullable);
                            } catch (Throwable th2) {
                                setCompletionResult(string2, handlerErrors);
                                stopTaskAndCommitOffset(offsetStorageWriter, ofMillis, ofNullable);
                                throw th2;
                            }
                        } catch (Throwable th3) {
                            stopOffsetStoreAndConnector(instantiateConnector, string2, initializeOffsetStore, ofNullable);
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        if (!(th4 instanceof EmbeddedEngineRuntimeException)) {
                            fail("Error while trying to run connector class '" + string2 + "'", th4);
                        }
                        stopOffsetStoreAndConnector(instantiateConnector, string2, initializeOffsetStore, ofNullable);
                    }
                    this.latch.countDown();
                    this.runningThread.set(null);
                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                } catch (Throwable th5) {
                    this.latch.countDown();
                    this.runningThread.set(null);
                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                    throw th5;
                }
            } catch (EmbeddedEngineRuntimeException e) {
                LOGGER.debug("Failed to run EmbeddedEngine.", e);
                this.latch.countDown();
                this.runningThread.set(null);
                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
            }
        }
    }

    private SourceConnector instantiateConnector(String str) throws EmbeddedEngineRuntimeException {
        try {
            return (SourceConnector) this.classLoader.loadClass(str).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Throwable th) {
            failAndThrow("Unable to instantiate connector class '" + str + "'", th);
            return null;
        }
    }

    private Map<String, String> getConnectorConfig(SourceConnector sourceConnector, String str) throws EmbeddedEngineRuntimeException {
        Map<String, String> originalsStrings = this.workerConfig.originalsStrings();
        Config config = null;
        try {
            config = sourceConnector.validate(originalsStrings);
        } catch (Exception e) {
            failAndThrow("Connector configuration is not valid: " + e.getMessage(), e);
        }
        ConfigInfos generateResult = AbstractHerder.generateResult(str, Collections.emptyMap(), config.configValues(), sourceConnector.config().groups());
        if (generateResult.errorCount() > 0) {
            failAndThrow("Connector configuration is not valid. " + ((String) generateResult.values().stream().flatMap(configInfo -> {
                return configInfo.configValue().errors().stream();
            }).collect(Collectors.joining(" "))), null);
        }
        return originalsStrings;
    }

    private OffsetBackingStore initializeOffsetStore(Map<String, String> map) throws EmbeddedEngineRuntimeException {
        String string = this.config.getString(EmbeddedEngineConfig.OFFSET_STORAGE);
        MemoryOffsetBackingStore memoryOffsetBackingStore = null;
        try {
            memoryOffsetBackingStore = string.equals(MemoryOffsetBackingStore.class.getName()) ? KafkaConnectUtil.memoryOffsetBackingStore() : string.equals(FileOffsetBackingStore.class.getName()) ? KafkaConnectUtil.fileOffsetBackingStore() : string.equals(KafkaOffsetBackingStore.class.getName()) ? KafkaConnectUtil.kafkaOffsetBackingStore(map) : (OffsetBackingStore) this.classLoader.loadClass(string).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Throwable th) {
            failAndThrow("Unable to instantiate OffsetBackingStore class '" + string + "'", th);
        }
        try {
            memoryOffsetBackingStore.configure(this.workerConfig);
            memoryOffsetBackingStore.start();
            return memoryOffsetBackingStore;
        } catch (Throwable th2) {
            fail("Unable to configure and start the '" + string + "' offset backing store", th2);
            memoryOffsetBackingStore.stop();
            throw new EmbeddedEngineRuntimeException();
        }
    }

    private void setOffsetCommitPolicy() throws EmbeddedEngineRuntimeException {
        if (this.offsetCommitPolicy == null) {
            try {
                this.offsetCommitPolicy = (OffsetCommitPolicy) Instantiator.getInstanceWithProperties(this.config.getString(EmbeddedEngineConfig.OFFSET_COMMIT_POLICY), this.config.asProperties());
            } catch (Throwable th) {
                failAndThrow("Unable to instantiate OffsetCommitPolicy class '" + this.config.getString(EmbeddedEngineConfig.OFFSET_STORAGE) + "'", th);
            }
        }
    }

    private void initializeConnector(SourceConnector sourceConnector, final OffsetStorageReader offsetStorageReader) {
        sourceConnector.initialize(new SourceConnectorContext() { // from class: io.debezium.embedded.EmbeddedEngine.2
            public void requestTaskReconfiguration() {
            }

            public void raiseError(Exception exc) {
                EmbeddedEngine.this.fail(exc.getMessage(), exc);
            }

            public OffsetStorageReader offsetStorageReader() {
                return offsetStorageReader;
            }
        });
    }

    private SourceTask createSourceTask(List<Map<String, String>> list, Class<? extends Task> cls) throws EmbeddedEngineRuntimeException, NoSuchMethodException, InvocationTargetException {
        if (list.isEmpty()) {
            failAndThrow("Unable to start connector's task class '" + cls.getName() + "' with no task configuration", null);
        }
        SourceTask sourceTask = null;
        try {
            sourceTask = (SourceTask) cls.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (IllegalAccessException | InstantiationException e) {
            failAndThrow("Unable to instantiate connector's task class '" + cls.getName() + "'", e);
        }
        return sourceTask;
    }

    private void startSourceTask(List<Map<String, String>> list, final OffsetStorageReader offsetStorageReader) {
        this.task.initialize(new SourceTaskContext() { // from class: io.debezium.embedded.EmbeddedEngine.3
            public OffsetStorageReader offsetStorageReader() {
                return offsetStorageReader;
            }

            public Map<String, String> configs() {
                return null;
            }
        });
        this.task.start(list.get(0));
    }

    private void stopSourceTask() {
        try {
            LOGGER.debug("Stopping the task");
            this.task.stop();
        } catch (Throwable th) {
            LOGGER.info("Error while trying to stop the task");
        }
    }

    private Throwable handleRetries(RetriableException retriableException, List<Map<String, String>> list) {
        int errorsMaxRetries = getErrorsMaxRetries();
        LOGGER.info("Retriable exception thrown, connector will be restarted; errors.max.retries={}", Integer.valueOf(errorsMaxRetries), retriableException);
        if (errorsMaxRetries == 0) {
            return retriableException;
        }
        if (errorsMaxRetries < -1) {
            LOGGER.warn("Setting {}={} is deprecated. To disable retries on connection errors, set {}=0", new Object[]{EmbeddedEngineConfig.ERRORS_MAX_RETRIES.name(), Integer.valueOf(errorsMaxRetries), EmbeddedEngineConfig.ERRORS_MAX_RETRIES.name()});
            return retriableException;
        }
        DelayStrategy delayStrategy = delayStrategy(this.config);
        int i = 0;
        boolean z = false;
        while (!z) {
            try {
                i++;
                LOGGER.info("Starting connector, attempt {}", Integer.valueOf(i));
                this.task.stop();
                this.task.start(list.get(0));
                z = true;
            } catch (Exception e) {
                if (errorsMaxRetries != -1 && i >= errorsMaxRetries) {
                    LOGGER.error("Can't start the connector, max retries to connect exceeded; stopping connector...", e);
                    return e;
                }
                LOGGER.error("Can't start the connector, will retry later...", e);
            }
            delayStrategy.sleepWhen(!z);
        }
        return null;
    }

    private void pollRecords(List<Map<String, String>> list, DebeziumEngine.RecordCommitter recordCommitter, HandlerErrors handlerErrors) throws Throwable {
        while (this.runningThread.get() != null) {
            List list2 = null;
            try {
                LOGGER.debug("Embedded engine is polling task for records on thread {}", this.runningThread.get());
                list2 = this.task.poll();
                LOGGER.debug("Embedded engine returned from polling task for records");
            } catch (InterruptedException e) {
                LOGGER.debug("Embedded engine interrupted on thread {} while polling the task for records", this.runningThread.get());
                if (this.runningThread.get() == Thread.currentThread()) {
                    Thread.currentThread().interrupt();
                    return;
                }
                return;
            } catch (RetriableException e2) {
                handlerErrors.retryError = handleRetries(e2, list);
                if (handlerErrors.retryError != null) {
                    throw handlerErrors.retryError;
                }
            }
            if (list2 != null) {
                try {
                    if (!list2.isEmpty()) {
                        LOGGER.debug("Received {} records from the task", Integer.valueOf(list2.size()));
                        Stream stream = list2.stream();
                        Transformations transformations = this.transformations;
                        Objects.requireNonNull(transformations);
                        list2 = (List) stream.map(transformations::transform).filter(sourceRecord -> {
                            return sourceRecord != null;
                        }).collect(Collectors.toList());
                    }
                } catch (Throwable th) {
                    handlerErrors.handlerError = th;
                    return;
                }
            }
            if (list2 == null || list2.isEmpty()) {
                LOGGER.debug("Received no records from the task");
            } else {
                LOGGER.debug("Received {} transformed records from the task", Integer.valueOf(list2.size()));
                try {
                    this.handler.handleBatch(list2, recordCommitter);
                } catch (StopEngineException e3) {
                    return;
                }
            }
        }
    }

    private void setCompletionResult(String str, HandlerErrors handlerErrors) {
        if (handlerErrors.handlerError != null) {
            fail("Stopping connector after error in the application's handler method: " + handlerErrors.handlerError.getMessage(), handlerErrors.handlerError);
        } else if (handlerErrors.retryError != null) {
            fail("Stopping connector after retry error: " + handlerErrors.retryError.getMessage(), handlerErrors.retryError);
        } else {
            succeed("Connector '" + str + "' completed normally.");
        }
    }

    private void stopTaskAndCommitOffset(OffsetStorageWriter offsetStorageWriter, Duration duration, Optional<DebeziumEngine.ConnectorCallback> optional) {
        try {
            LOGGER.info("Stopping the task and engine");
            this.task.stop();
            optional.ifPresent((v0) -> {
                v0.taskStopped();
            });
            commitOffsets(offsetStorageWriter, duration, this.task);
        } catch (InterruptedException e) {
            LOGGER.debug("Interrupted while committing offsets");
            Thread.currentThread().interrupt();
        } catch (Throwable th) {
            fail("Error while trying to stop the task and commit the offsets", th);
        }
    }

    private void stopOffsetStoreAndConnector(SourceConnector sourceConnector, String str, OffsetBackingStore offsetBackingStore, Optional<DebeziumEngine.ConnectorCallback> optional) {
        try {
            try {
                offsetBackingStore.stop();
                try {
                    sourceConnector.stop();
                    optional.ifPresent((v0) -> {
                        v0.connectorStopped();
                    });
                } catch (Throwable th) {
                    fail("Error while trying to stop connector class '" + str + "'", th);
                }
            } catch (Throwable th2) {
                try {
                    sourceConnector.stop();
                    optional.ifPresent((v0) -> {
                        v0.connectorStopped();
                    });
                } catch (Throwable th3) {
                    fail("Error while trying to stop connector class '" + str + "'", th3);
                }
                throw th2;
            }
        } catch (Throwable th4) {
            fail("Error while trying to stop the offset store", th4);
            try {
                sourceConnector.stop();
                optional.ifPresent((v0) -> {
                    v0.connectorStopped();
                });
            } catch (Throwable th5) {
                fail("Error while trying to stop connector class '" + str + "'", th5);
            }
        }
    }

    private int getErrorsMaxRetries() {
        return this.config.getInteger(EmbeddedEngineConfig.ERRORS_MAX_RETRIES);
    }

    protected DebeziumEngine.RecordCommitter buildRecordCommitter(final OffsetStorageWriter offsetStorageWriter, final SourceTask sourceTask, final Duration duration) {
        return new DebeziumEngine.RecordCommitter<SourceRecord>() { // from class: io.debezium.embedded.EmbeddedEngine.4
            public synchronized void markProcessed(SourceRecord sourceRecord) throws InterruptedException {
                sourceTask.commitRecord(sourceRecord);
                EmbeddedEngine.this.recordsSinceLastCommit++;
                offsetStorageWriter.offset(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
            }

            public synchronized void markBatchFinished() throws InterruptedException {
                EmbeddedEngine.this.maybeFlush(offsetStorageWriter, EmbeddedEngine.this.offsetCommitPolicy, duration, sourceTask);
            }

            public synchronized void markProcessed(SourceRecord sourceRecord, DebeziumEngine.Offsets offsets) throws InterruptedException {
                markProcessed(new SourceRecord(sourceRecord.sourcePartition(), ((SourceRecordOffsets) offsets).getOffsets(), sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), sourceRecord.valueSchema(), sourceRecord.value(), sourceRecord.timestamp(), sourceRecord.headers()));
            }

            public DebeziumEngine.Offsets buildOffsets() {
                return new SourceRecordOffsets();
            }
        };
    }

    protected void maybeFlush(OffsetStorageWriter offsetStorageWriter, OffsetCommitPolicy offsetCommitPolicy, Duration duration, SourceTask sourceTask) throws InterruptedException {
        if (offsetCommitPolicy.performCommit(this.recordsSinceLastCommit, Duration.ofMillis(this.clock.currentTimeInMillis() - this.timeOfLastCommitMillis))) {
            commitOffsets(offsetStorageWriter, duration, sourceTask);
        }
    }

    protected void commitOffsets(OffsetStorageWriter offsetStorageWriter, Duration duration, SourceTask sourceTask) throws InterruptedException {
        Future doFlush;
        long currentTimeInMillis = this.clock.currentTimeInMillis() + duration.toMillis();
        if (offsetStorageWriter.beginFlush() && (doFlush = offsetStorageWriter.doFlush(this::completedFlush)) != null) {
            try {
                doFlush.get(Math.max(currentTimeInMillis - this.clock.currentTimeInMillis(), 0L), TimeUnit.MILLISECONDS);
                sourceTask.commit();
                this.recordsSinceLastCommit = 0L;
                this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
            } catch (InterruptedException e) {
                LOGGER.warn("Flush of {} offsets interrupted, cancelling", this);
                offsetStorageWriter.cancelFlush();
                if (this.runningThread.get() == Thread.currentThread()) {
                    Thread.currentThread().interrupt();
                    throw e;
                }
            } catch (ExecutionException e2) {
                LOGGER.error("Flush of {} offsets threw an unexpected exception: ", this, e2);
                offsetStorageWriter.cancelFlush();
            } catch (TimeoutException e3) {
                LOGGER.error("Timed out waiting to flush {} offsets to storage", this);
                offsetStorageWriter.cancelFlush();
            }
        }
    }

    protected void completedFlush(Throwable th, Void r7) {
        if (th != null) {
            LOGGER.error("Failed to flush {} offsets to storage: ", this, th);
        } else {
            LOGGER.trace("Finished flushing {} offsets to storage", this);
        }
    }

    public boolean stop() {
        LOGGER.info("Stopping the embedded engine");
        Thread andSet = this.runningThread.getAndSet(null);
        if (andSet == null) {
            return false;
        }
        try {
            Duration ofMillis = Duration.ofMillis(this.config.getLong(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS));
            LOGGER.info("Waiting for {} for connector to stop", ofMillis);
            this.latch.await(ofMillis.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
        LOGGER.debug("Interrupting the embedded engine's thread {} (already interrupted: {})", andSet, Boolean.valueOf(andSet.isInterrupted()));
        andSet.interrupt();
        return true;
    }

    public void close() throws IOException {
        stop();
    }

    public boolean await(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.latch.await(j, timeUnit);
    }

    public String toString() {
        return "EmbeddedEngine{id=" + this.config.getString(EmbeddedEngineConfig.ENGINE_NAME) + "}";
    }

    public void runWithTask(Consumer<SourceTask> consumer) {
        consumer.accept(this.task);
    }

    private DelayStrategy delayStrategy(Configuration configuration) {
        return DelayStrategy.exponential(Duration.ofMillis(configuration.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_INITIAL_MS)), Duration.ofMillis(configuration.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_MAX_MS)));
    }

    static {
        $assertionsDisabled = !EmbeddedEngine.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(EmbeddedEngine.class);
    }
}
