package io.castled.kafka.consumer;

import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/castled/kafka/consumer/BaseKafkaConsumer.class */
public abstract class BaseKafkaConsumer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BaseKafkaConsumer.class);
    private final KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private final AtomicReference<ConsumerState> consumerState = new AtomicReference<>(ConsumerState.RUNNING);
    private final KafkaConsumerConfiguration kafkaConsumerConfiguration;

    public BaseKafkaConsumer(KafkaConsumerConfiguration kafkaConsumerConfiguration) {
        HashMap hashMap = new HashMap();
        hashMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerConfiguration.getConsumerGroup());
        hashMap.put("bootstrap.servers", kafkaConsumerConfiguration.getBootstrapServers());
        hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
        hashMap.put("request.timeout.ms", 60000);
        hashMap.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 8388608);
        hashMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        this.kafkaConsumer = new KafkaConsumer<>(hashMap);
        this.kafkaConsumerConfiguration = kafkaConsumerConfiguration;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.kafkaConsumer.subscribe(Collections.singleton(this.kafkaConsumerConfiguration.getTopic()));
        while (this.consumerState.get() == ConsumerState.RUNNING) {
            ConsumerRecords<byte[], byte[]> poll = this.kafkaConsumer.poll(Duration.ofSeconds(10L));
            for (TopicPartition topicPartition : poll.partitions()) {
                processPartition(topicPartition, poll.records(topicPartition));
            }
        }
    }

    private void processPartition(TopicPartition topicPartition, List<ConsumerRecord<byte[], byte[]>> list) {
        try {
            long processRecords = processRecords(list);
            if (processRecords != -1) {
                this.kafkaConsumer.commitSync(ImmutableMap.of(topicPartition, new OffsetAndMetadata(processRecords + 1)));
            }
        } catch (KafkaRetriableException e) {
            if (e.getLastProcessedOffset() != -1) {
                this.kafkaConsumer.seek(topicPartition, e.getLastProcessedOffset() + 1);
            } else {
                this.kafkaConsumer.seek(topicPartition, list.get(0).offset());
            }
        } catch (Exception e2) {
            log.error("Failed to process records for topic {} and partition {}", topicPartition.topic(), Integer.valueOf(topicPartition.partition()), e2);
            if (this.kafkaConsumerConfiguration.isRetryOnUnhandledFailures()) {
                this.kafkaConsumer.seek(topicPartition, list.get(0).offset());
            }
        }
    }

    public abstract long processRecords(List<ConsumerRecord<byte[], byte[]>> list) throws Exception;

    public void stop() throws Exception {
        this.consumerState.set(ConsumerState.TERMINATED);
    }
}
