package io.castled.apps.connectors.kafka;

import com.google.common.collect.Sets;
import io.castled.ObjectRegistry;
import io.castled.apps.DataSink;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.kafka.producer.CastledKafkaProducer;
import io.castled.kafka.producer.CastledProducerCallback;
import io.castled.kafka.producer.KafkaProducerConfiguration;
import io.castled.schema.models.Message;
import io.castled.utils.MessageUtils;
import java.util.Collections;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:io/castled/apps/connectors/kafka/KafkaDataSink.class */
public class KafkaDataSink implements DataSink {
    private final AtomicLong recordsProcessed = new AtomicLong(0);
    private final Set<Long> pendingMessageIds = Sets.newConcurrentHashSet();
    private long lastBufferedOffset = 0;
    private volatile Exception throwable;

    /* loaded from: input_file:io/castled/apps/connectors/kafka/KafkaDataSink$DataSinkCallback.class */
    public class DataSinkCallback implements CastledProducerCallback {
        private final long messageOffset;

        public DataSinkCallback(long j) {
            this.messageOffset = j;
        }

        @Override // io.castled.kafka.producer.CastledProducerCallback
        public void onSuccess(RecordMetadata recordMetadata) {
            KafkaDataSink.this.recordsProcessed.incrementAndGet();
            KafkaDataSink.this.pendingMessageIds.remove(Long.valueOf(this.messageOffset));
        }

        @Override // io.castled.kafka.producer.CastledProducerCallback
        public void onFailure(RecordMetadata recordMetadata, Exception exc) {
            KafkaDataSink.this.throwable = exc;
        }
    }

    @Override // io.castled.apps.DataSink
    public void syncRecords(DataSinkRequest dataSinkRequest) throws Exception {
        KafkaAppConfig kafkaAppConfig = (KafkaAppConfig) dataSinkRequest.getExternalApp().getConfig();
        KafkaAppSyncConfig kafkaAppSyncConfig = (KafkaAppSyncConfig) dataSinkRequest.getAppSyncConfig();
        CastledKafkaProducer castledKafkaProducer = new CastledKafkaProducer(KafkaProducerConfiguration.builder().bootstrapServers(kafkaAppConfig.getBootstrapServers()).build());
        while (true) {
            try {
                Message readMessage = dataSinkRequest.getMessageInputStream().readMessage();
                if (readMessage == null) {
                    castledKafkaProducer.flush();
                    validateAndThrow();
                    castledKafkaProducer.close();
                    return;
                } else {
                    validateAndThrow();
                    this.pendingMessageIds.add(Long.valueOf(readMessage.getOffset()));
                    publishMessage(castledKafkaProducer, readMessage, kafkaAppSyncConfig.getTopic(), dataSinkRequest.getErrorOutputStream());
                    this.lastBufferedOffset = readMessage.getOffset();
                }
            } catch (Throwable th) {
                try {
                    castledKafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    private void publishMessage(CastledKafkaProducer castledKafkaProducer, Message message, String str, ErrorOutputStream errorOutputStream) {
        try {
            castledKafkaProducer.publish(new ProducerRecord<>(str, null, MessageUtils.messageToBytes(message)), new DataSinkCallback(message.getOffset()));
        } catch (Exception e) {
            this.pendingMessageIds.remove(Long.valueOf(message.getOffset()));
            this.recordsProcessed.incrementAndGet();
            errorOutputStream.writeFailedRecord(message, ((KafkaErrorParser) ObjectRegistry.getInstance(KafkaErrorParser.class)).parseException(e));
        }
    }

    private void validateAndThrow() throws Exception {
        if (this.throwable != null) {
            throw this.throwable;
        }
    }

    @Override // io.castled.apps.DataSink
    public AppSyncStats getSyncStats() {
        return new AppSyncStats(this.recordsProcessed.get(), getProcessedOffset(), 0L);
    }

    public long getProcessedOffset() {
        try {
            return ((Long) Collections.min(this.pendingMessageIds)).longValue() - 1;
        } catch (NoSuchElementException e) {
            return this.lastBufferedOffset;
        }
    }
}
