package reactor.kafka.samples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.Sender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

/* loaded from: input_file:reactor/kafka/samples/SampleProducer.class */
public class SampleProducer {
    private static final Logger log = LoggerFactory.getLogger(SampleProducer.class.getName());
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "demo-topic";
    private final Sender<Integer, String> sender;
    private final SimpleDateFormat dateFormat;

    public SampleProducer(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", str);
        hashMap.put("client.id", "sample-producer");
        hashMap.put("acks", "all");
        hashMap.put("key.serializer", IntegerSerializer.class);
        hashMap.put("value.serializer", StringSerializer.class);
        this.sender = Sender.create(SenderOptions.create(hashMap));
        this.dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy");
    }

    public void sendMessages(String str, int i, CountDownLatch countDownLatch) throws InterruptedException {
        this.sender.send(Flux.range(1, i).map(num -> {
            return SenderRecord.create(new ProducerRecord(str, num, "Message_" + num), num);
        }), true).doOnError(th -> {
            log.error("Send failed", th);
        }).subscribe(senderResult -> {
            RecordMetadata recordMetadata = senderResult.recordMetadata();
            System.out.printf("Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n", senderResult.correlationMetadata(), recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()), this.dateFormat.format(new Date(recordMetadata.timestamp())));
            countDownLatch.countDown();
        });
    }

    public void close() {
        this.sender.close();
    }

    public static void main(String[] strArr) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(20);
        SampleProducer sampleProducer = new SampleProducer(BOOTSTRAP_SERVERS);
        sampleProducer.sendMessages(TOPIC, 20, countDownLatch);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        sampleProducer.close();
    }
}
