package io.debezium.embedded;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.debezium.util.Clock;
import io.debezium.util.VariableLatch;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/debezium/embedded/EmbeddedEngine.class */
public final class EmbeddedEngine implements Runnable {
    public static final Field ENGINE_NAME;
    public static final Field CONNECTOR_CLASS;
    public static final Field OFFSET_STORAGE;
    public static final Field OFFSET_STORAGE_FILE_FILENAME;
    public static final Field OFFSET_STORAGE_KAFKA_TOPIC;
    public static final Field OFFSET_STORAGE_KAFKA_PARTITIONS;
    public static final Field OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR;
    public static final Field OFFSET_FLUSH_INTERVAL_MS;
    public static final Field OFFSET_COMMIT_TIMEOUT_MS;
    public static final Field OFFSET_COMMIT_POLICY;
    protected static final Field INTERNAL_KEY_CONVERTER_CLASS;
    protected static final Field INTERNAL_VALUE_CONVERTER_CLASS;
    public static final Field.Set CONNECTOR_FIELDS;
    protected static final Field.Set ALL_FIELDS;
    private final Logger logger;
    private final Configuration config;
    private final Clock clock;
    private final ClassLoader classLoader;
    private final Consumer<SourceRecord> consumer;
    private final CompletionCallback completionCallback;
    private final ConnectorCallback connectorCallback;
    private final AtomicReference<Thread> runningThread;
    private final VariableLatch latch;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final WorkerConfig workerConfig;
    private final CompletionResult completionResult;
    private long recordsSinceLastCommit;
    private long timeOfLastCommitMillis;
    private OffsetCommitPolicy offsetCommitPolicy;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$Builder.class */
    public interface Builder {
        Builder notifying(Consumer<SourceRecord> consumer);

        Builder using(Configuration configuration);

        Builder using(ClassLoader classLoader);

        Builder using(Clock clock);

        Builder using(CompletionCallback completionCallback);

        Builder using(ConnectorCallback connectorCallback);

        Builder using(OffsetCommitPolicy offsetCommitPolicy);

        EmbeddedEngine build();
    }

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$CompletionCallback.class */
    public interface CompletionCallback {
        void handle(boolean z, String str, Throwable th);
    }

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$CompletionResult.class */
    public static class CompletionResult implements CompletionCallback {
        private final CompletionCallback delegate;
        private final CountDownLatch completed;
        private boolean success;
        private String message;
        private Throwable error;

        public CompletionResult() {
            this(null);
        }

        public CompletionResult(CompletionCallback completionCallback) {
            this.completed = new CountDownLatch(1);
            this.delegate = completionCallback;
        }

        @Override // io.debezium.embedded.EmbeddedEngine.CompletionCallback
        public void handle(boolean z, String str, Throwable th) {
            this.success = z;
            this.message = str;
            this.error = th;
            this.completed.countDown();
            if (this.delegate != null) {
                this.delegate.handle(z, str, th);
            }
        }

        public void await() throws InterruptedException {
            this.completed.await();
        }

        public boolean await(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.completed.await(j, timeUnit);
        }

        public boolean hasCompleted() {
            return this.completed.getCount() == 0;
        }

        public boolean success() {
            return this.success;
        }

        public String message() {
            return this.message;
        }

        public Throwable error() {
            return this.error;
        }

        public boolean hasError() {
            return this.error != null;
        }
    }

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$ConnectorCallback.class */
    public interface ConnectorCallback {
        default void connectorStarted() {
        }

        default void connectorStopped() {
        }

        default void taskStarted() {
        }

        default void taskStopped() {
        }
    }

    /* loaded from: input_file:io/debezium/embedded/EmbeddedEngine$EmbeddedConfig.class */
    protected static class EmbeddedConfig extends WorkerConfig {
        private static final ConfigDef CONFIG;

        protected EmbeddedConfig(Map<String, String> map) {
            super(CONFIG, map);
        }

        static {
            ConfigDef baseConfigDef = baseConfigDef();
            Field.group(baseConfigDef, "file", new Field[]{EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME});
            Field.group(baseConfigDef, "kafka", new Field[]{EmbeddedEngine.OFFSET_STORAGE_KAFKA_TOPIC});
            Field.group(baseConfigDef, "kafka", new Field[]{EmbeddedEngine.OFFSET_STORAGE_KAFKA_PARTITIONS});
            Field.group(baseConfigDef, "kafka", new Field[]{EmbeddedEngine.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR});
            CONFIG = baseConfigDef;
        }
    }

    public static Builder create() {
        return new Builder() { // from class: io.debezium.embedded.EmbeddedEngine.1
            private Configuration config;
            private Consumer<SourceRecord> consumer;
            private ClassLoader classLoader;
            private Clock clock;
            private CompletionCallback completionCallback;
            private ConnectorCallback connectorCallback;
            private OffsetCommitPolicy offsetCommitPolicy = null;

            @Override // io.debezium.embedded.EmbeddedEngine.Builder
            public Builder using(Configuration configuration) {
                this.config = configuration;
                return this;
            }

            @Override // io.debezium.embedded.EmbeddedEngine.Builder
            public Builder using(ClassLoader classLoader) {
                this.classLoader = classLoader;
                return this;
            }

            @Override // io.debezium.embedded.EmbeddedEngine.Builder
            public Builder using(Clock clock) {
                this.clock = clock;
                return this;
            }

            @Override // io.debezium.embedded.EmbeddedEngine.Builder
            public Builder using(CompletionCallback completionCallback) {
                this.completionCallback = completionCallback;
                return this;
            }

            @Override // io.debezium.embedded.EmbeddedEngine.Builder
            public Builder using(ConnectorCallback connectorCallback) {
                this.connectorCallback = connectorCallback;
                return this;
            }

            @Override // io.debezium.embedded.EmbeddedEngine.Builder
            public Builder using(OffsetCommitPolicy offsetCommitPolicy) {
                this.offsetCommitPolicy = offsetCommitPolicy;
                return this;
            }

            @Override // io.debezium.embedded.EmbeddedEngine.Builder
            public Builder notifying(Consumer<SourceRecord> consumer) {
                this.consumer = consumer;
                return this;
            }

            @Override // io.debezium.embedded.EmbeddedEngine.Builder
            public EmbeddedEngine build() {
                if (this.classLoader == null) {
                    this.classLoader = getClass().getClassLoader();
                }
                if (this.clock == null) {
                    this.clock = Clock.system();
                }
                Objects.requireNonNull(this.config, "A connector configuration must be specified.");
                Objects.requireNonNull(this.consumer, "A connector consumer must be specified.");
                return new EmbeddedEngine(this.config, this.classLoader, this.clock, this.consumer, this.completionCallback, this.connectorCallback, this.offsetCommitPolicy);
            }
        };
    }

    private EmbeddedEngine(Configuration configuration, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer, CompletionCallback completionCallback, ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.runningThread = new AtomicReference<>();
        this.latch = new VariableLatch(0);
        this.recordsSinceLastCommit = 0L;
        this.timeOfLastCommitMillis = 0L;
        this.config = configuration;
        this.consumer = consumer;
        this.classLoader = classLoader;
        this.clock = clock;
        this.completionCallback = completionCallback != null ? completionCallback : (z, str, th) -> {
            if (z) {
                return;
            }
            this.logger.error(str, th);
        };
        this.connectorCallback = connectorCallback;
        this.completionResult = new CompletionResult();
        this.offsetCommitPolicy = offsetCommitPolicy;
        if (!$assertionsDisabled && this.config == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.consumer == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.classLoader == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.clock == null) {
            throw new AssertionError();
        }
        this.keyConverter = (Converter) configuration.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> {
            return this.classLoader;
        });
        this.keyConverter.configure(configuration.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);
        this.valueConverter = (Converter) configuration.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> {
            return this.classLoader;
        });
        this.valueConverter.configure((this.valueConverter instanceof JsonConverter ? configuration.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build() : configuration).subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);
        Map asMap = configuration.asMap(ALL_FIELDS);
        asMap.put("key.converter", JsonConverter.class.getName());
        asMap.put("value.converter", JsonConverter.class.getName());
        this.workerConfig = new EmbeddedConfig(asMap);
    }

    public boolean isRunning() {
        return this.runningThread.get() != null;
    }

    private void fail(String str) {
        fail(str, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fail(String str, Throwable th) {
        if (this.completionResult.hasError()) {
            this.logger.error(str, th);
        } else {
            this.completionResult.handle(false, str, th);
        }
    }

    private void succeed(String str) {
        this.completionResult.handle(true, str, null);
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public void run() {
        SourceTask sourceTask;
        Throwable th;
        if (this.runningThread.compareAndSet(null, Thread.currentThread())) {
            String string = this.config.getString(ENGINE_NAME);
            String string2 = this.config.getString(CONNECTOR_CLASS);
            Optional ofNullable = Optional.ofNullable(this.connectorCallback);
            this.latch.countUp();
            try {
                Configuration configuration = this.config;
                Field.Set set = CONNECTOR_FIELDS;
                Logger logger = this.logger;
                logger.getClass();
                if (!configuration.validateAndRecord(set, logger::error)) {
                    fail("Failed to start connector with invalid configuration (see logs for actual errors)");
                    this.latch.countDown();
                    this.runningThread.set(null);
                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                    return;
                }
                try {
                    SourceConnector sourceConnector = (SourceConnector) this.classLoader.loadClass(string2).newInstance();
                    String string3 = this.config.getString(OFFSET_STORAGE);
                    try {
                        OffsetBackingStore offsetBackingStore = (OffsetBackingStore) this.classLoader.loadClass(string3).newInstance();
                        try {
                            offsetBackingStore.configure(this.workerConfig);
                            offsetBackingStore.start();
                            if (this.offsetCommitPolicy == null) {
                                this.offsetCommitPolicy = (OffsetCommitPolicy) this.config.getInstance(OFFSET_COMMIT_POLICY, OffsetCommitPolicy.class, this.config);
                            }
                            try {
                                sourceConnector.initialize(new ConnectorContext() { // from class: io.debezium.embedded.EmbeddedEngine.2
                                    public void requestTaskReconfiguration() {
                                    }

                                    public void raiseError(Exception exc) {
                                        EmbeddedEngine.this.fail(exc.getMessage(), exc);
                                    }
                                });
                                OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, string, this.keyConverter, this.valueConverter);
                                final OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl(offsetBackingStore, string, this.keyConverter, this.valueConverter);
                                long j = this.config.getLong(OFFSET_COMMIT_TIMEOUT_MS);
                                try {
                                    sourceConnector.start(this.config.asMap());
                                    ofNullable.ifPresent((v0) -> {
                                        v0.connectorStarted();
                                    });
                                    List taskConfigs = sourceConnector.taskConfigs(1);
                                    Class taskClass = sourceConnector.taskClass();
                                    try {
                                        sourceTask = (SourceTask) taskClass.newInstance();
                                        try {
                                            sourceTask.initialize(new SourceTaskContext() { // from class: io.debezium.embedded.EmbeddedEngine.3
                                                public OffsetStorageReader offsetStorageReader() {
                                                    return offsetStorageReaderImpl;
                                                }

                                                public Map<String, String> configs() {
                                                    return null;
                                                }
                                            });
                                            sourceTask.start((Map) taskConfigs.get(0));
                                            ofNullable.ifPresent((v0) -> {
                                                v0.taskStarted();
                                            });
                                            this.recordsSinceLastCommit = 0L;
                                            th = null;
                                        } catch (Throwable th2) {
                                            fail("Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: " + Configuration.from((Map) taskConfigs.get(0)).withMaskedPasswords(), th2);
                                            try {
                                                try {
                                                    offsetBackingStore.stop();
                                                    try {
                                                        sourceConnector.stop();
                                                        ofNullable.ifPresent((v0) -> {
                                                            v0.connectorStopped();
                                                        });
                                                    } catch (Throwable th3) {
                                                        fail("Error while trying to stop connector class '" + string2 + "'", th3);
                                                    }
                                                } catch (Throwable th4) {
                                                    fail("Error while trying to stop the offset store", th4);
                                                    try {
                                                        sourceConnector.stop();
                                                        ofNullable.ifPresent((v0) -> {
                                                            v0.connectorStopped();
                                                        });
                                                    } catch (Throwable th5) {
                                                        fail("Error while trying to stop connector class '" + string2 + "'", th5);
                                                    }
                                                    this.latch.countDown();
                                                    this.runningThread.set(null);
                                                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                                    return;
                                                }
                                                this.latch.countDown();
                                                this.runningThread.set(null);
                                                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                                return;
                                            } finally {
                                                try {
                                                    sourceConnector.stop();
                                                    ofNullable.ifPresent((v0) -> {
                                                        v0.connectorStopped();
                                                    });
                                                } catch (Throwable th6) {
                                                    fail("Error while trying to stop connector class '" + string2 + "'", th6);
                                                }
                                            }
                                        }
                                    } catch (IllegalAccessException | InstantiationException e) {
                                        try {
                                            fail("Unable to instantiate connector's task class '" + taskClass.getName() + "'", e);
                                            try {
                                                offsetBackingStore.stop();
                                                try {
                                                    sourceConnector.stop();
                                                    ofNullable.ifPresent((v0) -> {
                                                        v0.connectorStopped();
                                                    });
                                                } catch (Throwable th7) {
                                                    fail("Error while trying to stop connector class '" + string2 + "'", th7);
                                                }
                                            } catch (Throwable th8) {
                                                fail("Error while trying to stop the offset store", th8);
                                                try {
                                                    sourceConnector.stop();
                                                    ofNullable.ifPresent((v0) -> {
                                                        v0.connectorStopped();
                                                    });
                                                } catch (Throwable th9) {
                                                    fail("Error while trying to stop connector class '" + string2 + "'", th9);
                                                }
                                                this.latch.countDown();
                                                this.runningThread.set(null);
                                                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                                return;
                                            }
                                            this.latch.countDown();
                                            this.runningThread.set(null);
                                            this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                            return;
                                        } finally {
                                            try {
                                                sourceConnector.stop();
                                                ofNullable.ifPresent((v0) -> {
                                                    v0.connectorStopped();
                                                });
                                            } catch (Throwable th10) {
                                                fail("Error while trying to stop connector class '" + string2 + "'", th10);
                                            }
                                        }
                                    }
                                } catch (Throwable th11) {
                                    fail("Error while trying to run connector class '" + string2 + "'", th11);
                                    try {
                                        try {
                                            offsetBackingStore.stop();
                                            try {
                                                sourceConnector.stop();
                                                ofNullable.ifPresent((v0) -> {
                                                    v0.connectorStopped();
                                                });
                                            } catch (Throwable th12) {
                                                fail("Error while trying to stop connector class '" + string2 + "'", th12);
                                            }
                                        } catch (Throwable th13) {
                                            fail("Error while trying to stop the offset store", th13);
                                            try {
                                                sourceConnector.stop();
                                                ofNullable.ifPresent((v0) -> {
                                                    v0.connectorStopped();
                                                });
                                            } catch (Throwable th14) {
                                                fail("Error while trying to stop connector class '" + string2 + "'", th14);
                                            }
                                            this.latch.countDown();
                                            this.runningThread.set(null);
                                            this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                        }
                                    } finally {
                                        try {
                                            sourceConnector.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.connectorStopped();
                                            });
                                        } catch (Throwable th15) {
                                            fail("Error while trying to stop connector class '" + string2 + "'", th15);
                                        }
                                    }
                                }
                                try {
                                    this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
                                    boolean z = true;
                                    while (this.runningThread.get() != null && th == null && z) {
                                        try {
                                            try {
                                                this.logger.debug("Embedded engine is polling task for records on thread " + this.runningThread.get());
                                                List<SourceRecord> poll = sourceTask.poll();
                                                this.logger.debug("Embedded engine returned from polling task for records");
                                                if (poll != null) {
                                                    try {
                                                        if (!poll.isEmpty()) {
                                                            this.logger.debug("Received {} records from the task", Integer.valueOf(poll.size()));
                                                            for (SourceRecord sourceRecord : poll) {
                                                                try {
                                                                    this.consumer.accept(sourceRecord);
                                                                    sourceTask.commitRecord(sourceRecord);
                                                                    offsetStorageWriter.offset(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
                                                                    this.recordsSinceLastCommit++;
                                                                } catch (StopConnectorException e2) {
                                                                    z = false;
                                                                    offsetStorageWriter.offset(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
                                                                    this.recordsSinceLastCommit++;
                                                                } catch (Throwable th16) {
                                                                    th = th16;
                                                                }
                                                            }
                                                            maybeFlush(offsetStorageWriter, this.offsetCommitPolicy, j, sourceTask);
                                                            maybeFlush(offsetStorageWriter, this.offsetCommitPolicy, j, sourceTask);
                                                        }
                                                    } catch (Throwable th17) {
                                                        if (th == null) {
                                                            th = th17;
                                                        }
                                                        maybeFlush(offsetStorageWriter, this.offsetCommitPolicy, j, sourceTask);
                                                    }
                                                }
                                                this.logger.debug("Received no records from the task");
                                                maybeFlush(offsetStorageWriter, this.offsetCommitPolicy, j, sourceTask);
                                            } catch (InterruptedException e3) {
                                                this.logger.debug("Embedded engine interrupted on thread " + this.runningThread.get() + " while polling the task for records");
                                                Thread.interrupted();
                                                maybeFlush(offsetStorageWriter, this.offsetCommitPolicy, j, sourceTask);
                                            }
                                        } catch (Throwable th18) {
                                            maybeFlush(offsetStorageWriter, this.offsetCommitPolicy, j, sourceTask);
                                            throw th18;
                                        }
                                    }
                                    if (th != null) {
                                        fail("Stopping connector after error in the application's handler method: " + th.getMessage(), th);
                                    }
                                    try {
                                        this.logger.debug("Stopping the task and engine");
                                        sourceTask.stop();
                                        ofNullable.ifPresent((v0) -> {
                                            v0.taskStopped();
                                        });
                                        commitOffsets(offsetStorageWriter, j, sourceTask);
                                        if (th == null) {
                                            succeed("Connector '" + string2 + "' completed normally.");
                                        }
                                    } catch (Throwable th19) {
                                        fail("Error while trying to stop the task and commit the offsets", th19);
                                    }
                                    try {
                                        try {
                                            offsetBackingStore.stop();
                                            try {
                                                sourceConnector.stop();
                                                ofNullable.ifPresent((v0) -> {
                                                    v0.connectorStopped();
                                                });
                                            } catch (Throwable th20) {
                                                fail("Error while trying to stop connector class '" + string2 + "'", th20);
                                            }
                                        } catch (Throwable th21) {
                                            throw th21;
                                        }
                                    } catch (Throwable th22) {
                                        fail("Error while trying to stop the offset store", th22);
                                        try {
                                            sourceConnector.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.connectorStopped();
                                            });
                                        } catch (Throwable th23) {
                                            fail("Error while trying to stop connector class '" + string2 + "'", th23);
                                        }
                                    }
                                    this.latch.countDown();
                                    this.runningThread.set(null);
                                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                                } catch (Throwable th24) {
                                    if (0 != 0) {
                                        fail("Stopping connector after error in the application's handler method: " + th.getMessage(), null);
                                    }
                                    try {
                                        this.logger.debug("Stopping the task and engine");
                                        sourceTask.stop();
                                        ofNullable.ifPresent((v0) -> {
                                            v0.taskStopped();
                                        });
                                        commitOffsets(offsetStorageWriter, j, sourceTask);
                                        if (0 == 0) {
                                            succeed("Connector '" + string2 + "' completed normally.");
                                        }
                                    } catch (Throwable th25) {
                                        fail("Error while trying to stop the task and commit the offsets", th25);
                                    }
                                    throw th24;
                                }
                            } catch (Throwable th26) {
                                try {
                                    try {
                                        offsetBackingStore.stop();
                                        try {
                                            sourceConnector.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.connectorStopped();
                                            });
                                        } catch (Throwable th27) {
                                            fail("Error while trying to stop connector class '" + string2 + "'", th27);
                                        }
                                    } catch (Throwable th28) {
                                        fail("Error while trying to stop the offset store", th28);
                                        try {
                                            sourceConnector.stop();
                                            ofNullable.ifPresent((v0) -> {
                                                v0.connectorStopped();
                                            });
                                        } catch (Throwable th29) {
                                            fail("Error while trying to stop connector class '" + string2 + "'", th29);
                                        }
                                        throw th26;
                                    }
                                    throw th26;
                                } finally {
                                    try {
                                        sourceConnector.stop();
                                        ofNullable.ifPresent((v0) -> {
                                            v0.connectorStopped();
                                        });
                                    } catch (Throwable th30) {
                                        fail("Error while trying to stop connector class '" + string2 + "'", th30);
                                    }
                                }
                            }
                        } catch (Throwable th31) {
                            fail("Unable to configure and start the '" + string3 + "' offset backing store", th31);
                            this.latch.countDown();
                            this.runningThread.set(null);
                            this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                        }
                    } catch (Throwable th32) {
                        fail("Unable to instantiate OffsetBackingStore class '" + string3 + "'", th32);
                        this.latch.countDown();
                        this.runningThread.set(null);
                        this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                    }
                } catch (Throwable th33) {
                    fail("Unable to instantiate connector class '" + string2 + "'", th33);
                    this.latch.countDown();
                    this.runningThread.set(null);
                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                }
            } catch (Throwable th34) {
                this.latch.countDown();
                this.runningThread.set(null);
                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                throw th34;
            }
        }
    }

    protected void maybeFlush(OffsetStorageWriter offsetStorageWriter, OffsetCommitPolicy offsetCommitPolicy, long j, SourceTask sourceTask) {
        if (offsetCommitPolicy.performCommit(this.recordsSinceLastCommit, Duration.ofMillis(this.clock.currentTimeInMillis() - this.timeOfLastCommitMillis))) {
            commitOffsets(offsetStorageWriter, j, sourceTask);
        }
    }

    protected void commitOffsets(OffsetStorageWriter offsetStorageWriter, long j, SourceTask sourceTask) {
        Future doFlush;
        long currentTimeInMillis = this.clock.currentTimeInMillis() + j;
        if (offsetStorageWriter.beginFlush() && (doFlush = offsetStorageWriter.doFlush(this::completedFlush)) != null) {
            try {
                doFlush.get(Math.max(currentTimeInMillis - this.clock.currentTimeInMillis(), 0L), TimeUnit.MILLISECONDS);
                sourceTask.commit();
                this.recordsSinceLastCommit = 0L;
                this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
            } catch (InterruptedException e) {
                this.logger.warn("Flush of {} offsets interrupted, cancelling", this);
                offsetStorageWriter.cancelFlush();
            } catch (ExecutionException e2) {
                this.logger.error("Flush of {} offsets threw an unexpected exception: ", this, e2);
                offsetStorageWriter.cancelFlush();
            } catch (TimeoutException e3) {
                this.logger.error("Timed out waiting to flush {} offsets to storage", this);
                offsetStorageWriter.cancelFlush();
            }
        }
    }

    protected void completedFlush(Throwable th, Void r7) {
        if (th != null) {
            this.logger.error("Failed to flush {} offsets to storage: ", this, th);
        } else {
            this.logger.trace("Finished flushing {} offsets to storage", this);
        }
    }

    public boolean stop() {
        this.logger.debug("Stopping the embedded engine");
        Thread andSet = this.runningThread.getAndSet(null);
        if (andSet == null) {
            return false;
        }
        this.logger.debug("Interruping the embedded engine's thread " + andSet + " (already interrupted: " + andSet.isInterrupted() + ")");
        andSet.interrupt();
        return true;
    }

    public boolean await(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.latch.await(j, timeUnit);
    }

    public String toString() {
        return "EmbeddedConnector{id=" + this.config.getString(ENGINE_NAME) + '}';
    }

    static {
        $assertionsDisabled = !EmbeddedEngine.class.desiredAssertionStatus();
        ENGINE_NAME = Field.create("name").withDescription("Unique name for this connector instance.").withValidation(new Field.Validator[]{Field::isRequired});
        CONNECTOR_CLASS = Field.create("connector.class").withDescription("The Java class for the connector").withValidation(new Field.Validator[]{Field::isRequired});
        OFFSET_STORAGE = Field.create("offset.storage").withDescription("The Java class that implements the `OffsetBackingStore` interface, used to periodically store offsets so that, upon restart, the connector can resume where it last left off.").withDefault(FileOffsetBackingStore.class.getName());
        OFFSET_STORAGE_FILE_FILENAME = Field.create("offset.storage.file.filename").withDescription("The file where offsets are to be stored. Required when 'offset.storage' is set to the " + FileOffsetBackingStore.class.getName() + " class.").withDefault("");
        OFFSET_STORAGE_KAFKA_TOPIC = Field.create("offset.storage.topic").withDescription("The name of the Kafka topic where offsets are to be stored. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.").withDefault("");
        OFFSET_STORAGE_KAFKA_PARTITIONS = Field.create("offset.storage.partitions").withType(ConfigDef.Type.INT).withDescription("The number of partitions used when creating the offset storage topic. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.");
        OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR = Field.create("offset.storage.replication.factor").withType(ConfigDef.Type.SHORT).withDescription("Replication factor used when creating the offset storage topic. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.");
        OFFSET_FLUSH_INTERVAL_MS = Field.create("offset.flush.interval.ms").withDescription("Interval at which to try committing offsets. The default is 1 minute.").withDefault(60000L).withValidation(new Field.Validator[]{Field::isNonNegativeInteger});
        OFFSET_COMMIT_TIMEOUT_MS = Field.create("offset.flush.timeout.ms").withDescription("Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt.").withDefault(5000L).withValidation(new Field.Validator[]{Field::isPositiveInteger});
        OFFSET_COMMIT_POLICY = Field.create("offset.commit.policy").withDescription("The fully-qualified class name of the commit policy type. This class must implement the interface " + OffsetCommitPolicy.class.getName() + ". The default is a periodic commity policy based upon time intervals.").withDefault(OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName()).withValidation(new Field.Validator[]{Field::isClassName});
        INTERNAL_KEY_CONVERTER_CLASS = Field.create("internal.key.converter").withDescription("The Converter class that should be used to serialize and deserialize key data for offsets.").withDefault(JsonConverter.class.getName());
        INTERNAL_VALUE_CONVERTER_CLASS = Field.create("internal.value.converter").withDescription("The Converter class that should be used to serialize and deserialize value data for offsets.").withDefault(JsonConverter.class.getName());
        CONNECTOR_FIELDS = Field.setOf(new Field[]{ENGINE_NAME, CONNECTOR_CLASS});
        ALL_FIELDS = CONNECTOR_FIELDS.with(new Field[]{OFFSET_STORAGE, OFFSET_STORAGE_FILE_FILENAME, OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS, INTERNAL_KEY_CONVERTER_CLASS, INTERNAL_VALUE_CONVERTER_CLASS});
    }
}
