package reactor.kafka.samples;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Cancellation;
import reactor.core.publisher.BlockingSink;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.Receiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.Sender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

/* loaded from: input_file:reactor/kafka/samples/SampleScenarios.class */
public class SampleScenarios {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final Logger log = LoggerFactory.getLogger(SampleScenarios.class.getName());
    private static final String[] TOPICS = {"sample-topic1", "sample-topic2", "sample-topic3"};

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$AbstractScenario.class */
    public static abstract class AbstractScenario {
        String bootstrapServers;
        CommittableSource source;
        Sender<Integer, Person> sender;
        String groupId = "sample-group";
        List<Cancellation> cancellations = new ArrayList();

        AbstractScenario(String str) {
            this.bootstrapServers = SampleScenarios.BOOTSTRAP_SERVERS;
            this.bootstrapServers = str;
        }

        public abstract Flux<?> flux();

        public void runScenario() throws InterruptedException {
            flux().blockLast();
            close();
        }

        public void close() {
            if (this.sender != null) {
                this.sender.close();
            }
            Iterator<Cancellation> it = this.cancellations.iterator();
            while (it.hasNext()) {
                it.next().dispose();
            }
        }

        public SenderOptions<Integer, Person> senderOptions() {
            HashMap hashMap = new HashMap();
            hashMap.put("bootstrap.servers", this.bootstrapServers);
            hashMap.put("client.id", "sample-producer");
            hashMap.put("acks", "all");
            hashMap.put("key.serializer", IntegerSerializer.class);
            hashMap.put("value.serializer", PersonSerDes.class);
            return SenderOptions.create(hashMap);
        }

        public Sender<Integer, Person> sender(SenderOptions<Integer, Person> senderOptions) {
            return Sender.create(senderOptions);
        }

        public ReceiverOptions<Integer, Person> receiverOptions() {
            HashMap hashMap = new HashMap();
            hashMap.put("bootstrap.servers", this.bootstrapServers);
            hashMap.put("group.id", this.groupId);
            hashMap.put("client.id", "sample-consumer");
            hashMap.put("key.deserializer", IntegerDeserializer.class);
            hashMap.put("value.deserializer", PersonSerDes.class);
            return ReceiverOptions.create(hashMap);
        }

        public ReceiverOptions<Integer, Person> receiverOptions(Collection<String> collection) {
            return receiverOptions().addAssignListener(collection2 -> {
                SampleScenarios.log.info("Group {} partitions assigned {}", this.groupId, collection2);
            }).addRevokeListener(collection3 -> {
                SampleScenarios.log.info("Group {} partitions assigned {}", this.groupId, collection3);
            }).subscription(collection);
        }

        public void source(CommittableSource committableSource) {
            this.source = committableSource;
        }

        public CommittableSource source() {
            return this.source;
        }
    }

    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$AtmostOnce.class */
    public static class AtmostOnce extends AbstractScenario {
        private final String sourceTopic;
        private final String destTopic;

        public AtmostOnce(String str, String str2, String str3) {
            super(str);
            this.sourceTopic = str2;
            this.destTopic = str3;
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public Flux<?> flux() {
            return sender(senderOptions().producerProperty("acks", "0").producerProperty("retries", "0")).send(Receiver.create(receiverOptions(Collections.singleton(this.sourceTopic))).receiveAtmostOnce().map(consumerRecord -> {
                return SenderRecord.create(transform((Person) consumerRecord.value()), Long.valueOf(consumerRecord.offset()));
            }), true);
        }

        public ProducerRecord<Integer, Person> transform(Person person) {
            Person person2 = new Person(person.id(), person.firstName(), person.lastName());
            person2.email(person.firstName().toLowerCase(Locale.ROOT) + "@kafka.io");
            return new ProducerRecord<>(this.destTopic, Integer.valueOf(person.id()), person2);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ CommittableSource source() {
            return super.source();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void source(CommittableSource committableSource) {
            super.source(committableSource);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions(Collection collection) {
            return super.receiverOptions(collection);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions() {
            return super.receiverOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ Sender sender(SenderOptions senderOptions) {
            return super.sender(senderOptions);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ SenderOptions senderOptions() {
            return super.senderOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void runScenario() throws InterruptedException {
            super.runScenario();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$CommittableSource.class */
    public static class CommittableSource {
        private List<Person> sourceList = new ArrayList();

        CommittableSource() {
            this.sourceList.add(new Person(1, "John", "Doe"));
            this.sourceList.add(new Person(1, "Ada", "Lovelace"));
        }

        CommittableSource(List<Person> list) {
            this.sourceList.addAll(list);
        }

        Flux<Person> flux() {
            return Flux.fromIterable(this.sourceList);
        }

        void commit(int i) {
            SampleScenarios.log.trace("Committing {}", Integer.valueOf(i));
        }
    }

    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$FanOut.class */
    public static class FanOut extends AbstractScenario {
        private final String sourceTopic;
        private final String destTopic1;
        private final String destTopic2;

        public FanOut(String str, String str2, String str3, String str4) {
            super(str);
            this.sourceTopic = str2;
            this.destTopic1 = str3;
            this.destTopic2 = str4;
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public Flux<?> flux() {
            Scheduler newSingle = Schedulers.newSingle("sample1", true);
            Scheduler newSingle2 = Schedulers.newSingle("sample2", true);
            this.sender = sender(senderOptions());
            EmitterProcessor create = EmitterProcessor.create();
            BlockingSink connectSink = create.connectSink();
            Flux doOnNext = Receiver.create(receiverOptions(Collections.singleton(this.sourceTopic))).receiveAutoAck().concatMap(flux -> {
                return flux;
            }).doOnNext(consumerRecord -> {
                SampleScenarios.log.debug("Emit {} {}", Integer.valueOf(((Person) consumerRecord.value()).id()), connectSink.emit(consumerRecord.value()));
            });
            Publisher send = this.sender.send(create.publishOn(newSingle).map(person -> {
                return SenderRecord.create(process1(person, true), Integer.valueOf(person.id()));
            }), false);
            Publisher send2 = this.sender.send(create.publishOn(newSingle2).map(person2 -> {
                return SenderRecord.create(process2(person2, true), Integer.valueOf(person2.id()));
            }), false);
            AtomicReference atomicReference = new AtomicReference();
            Consumer consumer = atomicReference2 -> {
                Cancellation cancellation = (Cancellation) atomicReference2.getAndSet(null);
                if (cancellation != null) {
                    cancellation.dispose();
                }
            };
            return Flux.merge(new Publisher[]{send, send2}).doOnSubscribe(subscription -> {
                atomicReference.set(doOnNext.subscribe());
            }).doOnCancel(() -> {
                consumer.accept(atomicReference);
            });
        }

        public ProducerRecord<Integer, Person> process1(Person person, boolean z) {
            if (z) {
                SampleScenarios.log.debug("Processing person {} on stream1 in thread {}", Integer.valueOf(person.id()), Thread.currentThread().getName());
            }
            Person person2 = new Person(person.id(), person.firstName(), person.lastName());
            person2.email(person.firstName().toLowerCase(Locale.ROOT) + "@kafka.io");
            return new ProducerRecord<>(this.destTopic1, Integer.valueOf(person.id()), person2);
        }

        public ProducerRecord<Integer, Person> process2(Person person, boolean z) {
            if (z) {
                SampleScenarios.log.debug("Processing person {} on stream2 in thread {}", Integer.valueOf(person.id()), Thread.currentThread().getName());
            }
            Person person2 = new Person(person.id(), person.firstName(), person.lastName());
            person2.email(person.lastName().toLowerCase(Locale.ROOT) + "@reactor.io");
            return new ProducerRecord<>(this.destTopic2, Integer.valueOf(person.id()), person2);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ CommittableSource source() {
            return super.source();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void source(CommittableSource committableSource) {
            super.source(committableSource);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions(Collection collection) {
            return super.receiverOptions(collection);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions() {
            return super.receiverOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ Sender sender(SenderOptions senderOptions) {
            return super.sender(senderOptions);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ SenderOptions senderOptions() {
            return super.senderOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void runScenario() throws InterruptedException {
            super.runScenario();
        }
    }

    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$KafkaSink.class */
    public static class KafkaSink extends AbstractScenario {
        private final String topic;

        public KafkaSink(String str, String str2) {
            super(str);
            this.topic = str2;
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public Flux<?> flux() {
            return sender(senderOptions().producerProperty("acks", "all").producerProperty("max.block.ms", Long.MAX_VALUE).producerProperty("retries", Integer.MAX_VALUE)).send(source().flux().map(person -> {
                return SenderRecord.create(new ProducerRecord(this.topic, Integer.valueOf(person.id()), person), Integer.valueOf(person.id()));
            }), false).doOnError(th -> {
                SampleScenarios.log.error("Send failed, terminating.", th);
            }).doOnNext(senderResult -> {
                int intValue = ((Integer) senderResult.correlationMetadata()).intValue();
                SampleScenarios.log.trace("Successfully stored person with id {} in Kafka", Integer.valueOf(intValue));
                this.source.commit(intValue);
            });
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ CommittableSource source() {
            return super.source();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void source(CommittableSource committableSource) {
            super.source(committableSource);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions(Collection collection) {
            return super.receiverOptions(collection);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions() {
            return super.receiverOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ Sender sender(SenderOptions senderOptions) {
            return super.sender(senderOptions);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ SenderOptions senderOptions() {
            return super.senderOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void runScenario() throws InterruptedException {
            super.runScenario();
        }
    }

    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$KafkaSource.class */
    public static class KafkaSource extends AbstractScenario {
        private final String topic;

        public KafkaSource(String str, String str2) {
            super(str);
            this.topic = str2;
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public Flux<?> flux() {
            return Receiver.create(receiverOptions(Collections.singletonList(this.topic)).commitInterval(Duration.ZERO)).receive().publishOn(Schedulers.newSingle("sample", true)).concatMap(receiverRecord -> {
                return storeInDB((Person) receiverRecord.record().value()).doOnSuccess(r3 -> {
                });
            }).retry();
        }

        public Mono<Void> storeInDB(Person person) {
            SampleScenarios.log.info("Successfully processed person with id {} from Kafka", Integer.valueOf(person.id()));
            return Mono.empty();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ CommittableSource source() {
            return super.source();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void source(CommittableSource committableSource) {
            super.source(committableSource);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions(Collection collection) {
            return super.receiverOptions(collection);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions() {
            return super.receiverOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ Sender sender(SenderOptions senderOptions) {
            return super.sender(senderOptions);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ SenderOptions senderOptions() {
            return super.senderOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void runScenario() throws InterruptedException {
            super.runScenario();
        }
    }

    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$KafkaTransform.class */
    public static class KafkaTransform extends AbstractScenario {
        private final String sourceTopic;
        private final String destTopic;

        public KafkaTransform(String str, String str2, String str3) {
            super(str);
            this.sourceTopic = str2;
            this.destTopic = str3;
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public Flux<?> flux() {
            return sender(senderOptions()).send(Receiver.create(receiverOptions(Collections.singleton(this.sourceTopic))).receive().map(receiverRecord -> {
                return SenderRecord.create(transform((Person) receiverRecord.record().value()), receiverRecord.offset());
            }), false).doOnNext(senderResult -> {
                ((ReceiverOffset) senderResult.correlationMetadata()).acknowledge();
            });
        }

        public ProducerRecord<Integer, Person> transform(Person person) {
            Person person2 = new Person(person.id(), person.firstName(), person.lastName());
            person2.email(person.firstName().toLowerCase(Locale.ROOT) + "@kafka.io");
            return new ProducerRecord<>(this.destTopic, Integer.valueOf(person.id()), person2);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ CommittableSource source() {
            return super.source();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void source(CommittableSource committableSource) {
            super.source(committableSource);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions(Collection collection) {
            return super.receiverOptions(collection);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions() {
            return super.receiverOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ Sender sender(SenderOptions senderOptions) {
            return super.sender(senderOptions);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ SenderOptions senderOptions() {
            return super.senderOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void runScenario() throws InterruptedException {
            super.runScenario();
        }
    }

    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$PartitionProcessor.class */
    public static class PartitionProcessor extends AbstractScenario {
        private final String topic;

        public PartitionProcessor(String str, String str2) {
            super(str);
            this.topic = str2;
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public Flux<?> flux() {
            Scheduler newElastic = Schedulers.newElastic("sample", 60, true);
            return Receiver.create(receiverOptions(Collections.singleton(this.topic)).commitInterval(Duration.ZERO)).receive().groupBy(receiverRecord -> {
                return receiverRecord.offset().topicPartition();
            }).flatMap(groupedFlux -> {
                return groupedFlux.publishOn(newElastic).map(receiverRecord2 -> {
                    return processRecord((TopicPartition) groupedFlux.key(), receiverRecord2);
                }).sample(Duration.ofMillis(5000L)).concatMap(receiverOffset -> {
                    return receiverOffset.commit();
                });
            });
        }

        public ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, Person> receiverRecord) {
            SampleScenarios.log.info("Processing record {} from partition {} in thread{}", new Object[]{Integer.valueOf(((Person) receiverRecord.record().value()).id()), topicPartition, Thread.currentThread().getName()});
            return receiverRecord.offset();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ CommittableSource source() {
            return super.source();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void source(CommittableSource committableSource) {
            super.source(committableSource);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions(Collection collection) {
            return super.receiverOptions(collection);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ ReceiverOptions receiverOptions() {
            return super.receiverOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ Sender sender(SenderOptions senderOptions) {
            return super.sender(senderOptions);
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ SenderOptions senderOptions() {
            return super.senderOptions();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // reactor.kafka.samples.SampleScenarios.AbstractScenario
        public /* bridge */ /* synthetic */ void runScenario() throws InterruptedException {
            super.runScenario();
        }
    }

    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$Person.class */
    public static class Person {
        private final int id;
        private final String firstName;
        private final String lastName;
        private String email;

        public Person(int i, String str, String str2) {
            this.id = i;
            this.firstName = str;
            this.lastName = str2;
        }

        public int id() {
            return this.id;
        }

        public String firstName() {
            return this.firstName;
        }

        public String lastName() {
            return this.lastName;
        }

        public void email(String str) {
            this.email = str;
        }

        public String email() {
            return this.email == null ? "" : this.email;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof Person)) {
                return false;
            }
            Person person = (Person) obj;
            return this.id == person.id && stringEquals(this.firstName, person.firstName) && stringEquals(this.lastName, person.lastName) && stringEquals(this.email, person.email);
        }

        public int hashCode() {
            return (31 * ((31 * Integer.hashCode(this.id)) + (this.firstName != null ? this.firstName.hashCode() : 0))) + (this.lastName != null ? this.lastName.hashCode() : 0);
        }

        public String toString() {
            return "Person{id='" + this.id + "', firstName='" + this.firstName + "', lastName='" + this.lastName + "'}";
        }

        private boolean stringEquals(String str, String str2) {
            return str == null ? str2 == null : str.equals(str2);
        }
    }

    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$PersonSerDes.class */
    public static class PersonSerDes implements Serializer<Person>, Deserializer<Person> {
        public void configure(Map<String, ?> map, boolean z) {
        }

        public byte[] serialize(String str, Person person) {
            byte[] bytes = person.firstName().getBytes(StandardCharsets.UTF_8);
            byte[] bytes2 = person.lastName().getBytes(StandardCharsets.UTF_8);
            byte[] bytes3 = person.email().getBytes(StandardCharsets.UTF_8);
            ByteBuffer allocate = ByteBuffer.allocate(8 + bytes.length + 4 + bytes2.length + 4 + bytes3.length);
            allocate.putInt(person.id());
            allocate.putInt(bytes.length);
            allocate.put(bytes);
            allocate.putInt(bytes2.length);
            allocate.put(bytes2);
            allocate.putInt(bytes3.length);
            allocate.put(bytes3);
            return allocate.array();
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Person m4deserialize(String str, byte[] bArr) {
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            int i = wrap.getInt();
            byte[] bArr2 = new byte[wrap.getInt()];
            wrap.get(bArr2);
            String str2 = new String(bArr2, StandardCharsets.UTF_8);
            byte[] bArr3 = new byte[wrap.getInt()];
            wrap.get(bArr3);
            Person person = new Person(i, str2, new String(bArr3, StandardCharsets.UTF_8));
            byte[] bArr4 = new byte[wrap.getInt()];
            if (bArr4.length > 0) {
                wrap.get(bArr4);
                person.email(new String(bArr4, StandardCharsets.UTF_8));
            }
            return person;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:reactor/kafka/samples/SampleScenarios$Scenario.class */
    enum Scenario {
        KAFKA_SINK,
        KAFKA_SOURCE,
        KAFKA_TRANSFORM,
        ATMOST_ONCE,
        FAN_OUT,
        PARTITION
    }

    public static void main(String[] strArr) throws Exception {
        AbstractScenario partitionProcessor;
        if (strArr.length != 1) {
            System.out.println("Usage: " + SampleScenarios.class.getName() + " <scenario>");
            System.exit(1);
        }
        Scenario valueOf = Scenario.valueOf(strArr[0]);
        switch (valueOf) {
            case KAFKA_SINK:
                partitionProcessor = new KafkaSink(BOOTSTRAP_SERVERS, TOPICS[0]);
                break;
            case KAFKA_SOURCE:
                partitionProcessor = new KafkaSource(BOOTSTRAP_SERVERS, TOPICS[0]);
                break;
            case KAFKA_TRANSFORM:
                partitionProcessor = new KafkaTransform(BOOTSTRAP_SERVERS, TOPICS[0], TOPICS[1]);
                break;
            case ATMOST_ONCE:
                partitionProcessor = new AtmostOnce(BOOTSTRAP_SERVERS, TOPICS[0], TOPICS[1]);
                break;
            case FAN_OUT:
                partitionProcessor = new FanOut(BOOTSTRAP_SERVERS, TOPICS[0], TOPICS[1], TOPICS[2]);
                break;
            case PARTITION:
                partitionProcessor = new PartitionProcessor(BOOTSTRAP_SERVERS, TOPICS[0]);
                break;
            default:
                throw new IllegalArgumentException("Unsupported scenario " + valueOf);
        }
        partitionProcessor.runScenario();
    }
}
