package io.nosqlbench.driver.pulsar.ops;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnexpectedException;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.shade.org.apache.avro.AvroRuntimeException;

/* loaded from: input_file:io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.class */
public class PulsarConsumerOp implements PulsarOp {
    private static final Logger logger = LogManager.getLogger(PulsarConsumerOp.class);
    private final PulsarActivity pulsarActivity;
    private final boolean asyncPulsarOp;
    private final boolean useTransaction;
    private final boolean seqTracking;
    private final Supplier<Transaction> transactionSupplier;
    private final Consumer<?> consumer;
    private final Schema<?> pulsarSchema;
    private final int timeoutSeconds;
    private final EndToEndStartingTimeSource endToEndStartingTimeSource;
    private final Counter bytesCounter;
    private final Histogram messageSizeHistogram;
    private final Timer transactionCommitTimer;
    private final Histogram e2eMsgProcLatencyHistogram;
    private final Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic;
    private final Histogram payloadRttHistogram;
    private final String payloadRttTrackingField;
    private org.apache.avro.Schema avroSchema;

    public PulsarConsumerOp(PulsarActivity pulsarActivity, boolean z, boolean z2, boolean z3, Supplier<Transaction> supplier, Consumer<?> consumer, Schema<?> schema, int i, EndToEndStartingTimeSource endToEndStartingTimeSource, Function<String, ReceivedMessageSequenceTracker> function, String str) {
        this.pulsarActivity = pulsarActivity;
        this.asyncPulsarOp = z;
        this.useTransaction = z2;
        this.seqTracking = z3;
        this.transactionSupplier = supplier;
        this.consumer = consumer;
        this.pulsarSchema = schema;
        this.timeoutSeconds = i;
        this.endToEndStartingTimeSource = endToEndStartingTimeSource;
        this.bytesCounter = pulsarActivity.getBytesCounter();
        this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
        this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer();
        this.e2eMsgProcLatencyHistogram = pulsarActivity.getE2eMsgProcLatencyHistogram();
        this.payloadRttHistogram = pulsarActivity.getPayloadRttHistogram();
        this.receivedMessageSequenceTrackerForTopic = function;
        this.payloadRttTrackingField = str;
    }

    private void checkAndUpdateMessageErrorCounter(Message<?> message) {
        String property = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_NUMBER);
        if (StringUtils.isBlank(property)) {
            return;
        }
        this.receivedMessageSequenceTrackerForTopic.apply(message.getTopicName()).sequenceNumberReceived(Long.parseLong(property));
    }

    @Override // io.nosqlbench.driver.pulsar.ops.PulsarOp
    public void run(Runnable runnable) {
        Message<?> receive;
        Transaction transaction = this.useTransaction ? this.transactionSupplier.get() : null;
        if (this.asyncPulsarOp) {
            try {
                CompletableFuture receiveAsync = this.consumer.receiveAsync();
                if (this.useTransaction) {
                    Transaction transaction2 = transaction;
                    receiveAsync = receiveAsync.thenCompose(message -> {
                        Timer.Context time = this.transactionCommitTimer.time();
                        return transaction2.commit().whenComplete((r3, th) -> {
                            time.close();
                        }).thenApply(r32 -> {
                            return message;
                        });
                    });
                }
                Transaction transaction3 = transaction;
                receiveAsync.thenAccept(message2 -> {
                    try {
                        handleMessage(transaction3, message2);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (ExecutionException e2) {
                        this.pulsarActivity.asyncOperationFailed(e2.getCause());
                    } catch (PulsarClientException | TimeoutException e3) {
                        this.pulsarActivity.asyncOperationFailed(e3);
                    }
                }).exceptionally(th -> {
                    this.pulsarActivity.asyncOperationFailed(th);
                    return null;
                });
                return;
            } catch (Exception e) {
                throw new PulsarDriverUnexpectedException(e);
            }
        }
        try {
            if (this.timeoutSeconds <= 0) {
                receive = this.consumer.receive();
            } else {
                receive = this.consumer.receive(this.timeoutSeconds, TimeUnit.SECONDS);
                if (receive == null) {
                    throw new TimeoutException("Did not receive a message within " + this.timeoutSeconds + " seconds");
                }
            }
            handleMessage(transaction, receive);
        } catch (Exception e2) {
            logger.error("Sync message receiving failed - timeout value: {} seconds ", Integer.valueOf(this.timeoutSeconds), e2);
            throw new PulsarDriverUnexpectedException("Sync message receiving failed - timeout value: " + this.timeoutSeconds + " seconds ");
        }
    }

    private void handleMessage(Transaction transaction, Message<?> message) throws PulsarClientException, InterruptedException, ExecutionException, TimeoutException {
        if (this.useTransaction) {
            this.consumer.acknowledgeAsync(message.getMessageId(), transaction).get(this.timeoutSeconds, TimeUnit.SECONDS);
            Timer.Context time = this.transactionCommitTimer.time();
            try {
                transaction.commit().get();
                if (time != null) {
                    time.close();
                }
            } catch (Throwable th) {
                if (time != null) {
                    try {
                        time.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } else {
            this.consumer.acknowledgeAsync(message.getMessageId()).get(this.timeoutSeconds, TimeUnit.SECONDS);
        }
        if (logger.isDebugEnabled()) {
            Object value = message.getValue();
            if (value instanceof GenericObject) {
                logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}", this.consumer.getConsumerName(), message.getKey(), message.getProperties(), ((GenericObject) value).getNativeObject());
            } else {
                logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}", this.consumer.getConsumerName(), message.getKey(), message.getProperties(), new String(message.getData()));
            }
        }
        if (!this.payloadRttTrackingField.isEmpty()) {
            boolean z = false;
            Object value2 = message.getValue();
            Long l = null;
            if (value2 instanceof GenericRecord) {
                GenericRecord genericRecord = (GenericRecord) value2;
                Object obj = null;
                Object nativeObject = genericRecord.getNativeObject();
                if (nativeObject instanceof KeyValue) {
                    KeyValue keyValue = (KeyValue) nativeObject;
                    if (keyValue.getKey() instanceof GenericRecord) {
                        GenericRecord genericRecord2 = (GenericRecord) keyValue.getKey();
                        try {
                            obj = genericRecord2.getField(this.payloadRttTrackingField);
                        } catch (AvroRuntimeException e) {
                            logger.error("Cannot find {} in key {}: {}", this.payloadRttTrackingField, genericRecord2, e);
                        }
                    }
                    if ((keyValue.getValue() instanceof GenericRecord) && obj == null) {
                        GenericRecord genericRecord3 = (GenericRecord) keyValue.getValue();
                        try {
                            obj = genericRecord3.getField(this.payloadRttTrackingField);
                        } catch (AvroRuntimeException e2) {
                            logger.error("Cannot find {} in value {}: {}", this.payloadRttTrackingField, genericRecord3, e2);
                        }
                    }
                    if (obj == null) {
                        throw new RuntimeException("Cannot find field {}" + this.payloadRttTrackingField + " in " + keyValue.getKey() + " and " + keyValue.getValue());
                    }
                } else {
                    obj = genericRecord.getField(this.payloadRttTrackingField);
                }
                if (obj != null) {
                    l = obj instanceof Number ? Long.valueOf(((Number) obj).longValue()) : Long.valueOf(obj.toString());
                } else {
                    logger.error("Cannot find {} in value {}", this.payloadRttTrackingField, genericRecord);
                }
                z = true;
            }
            if (!z) {
                org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro = AvroUtil.GetGenericRecord_ApacheAvro(getSchemaFromConfiguration(), message.getData());
                if (GetGenericRecord_ApacheAvro.hasField(this.payloadRttTrackingField)) {
                    l = (Long) GetGenericRecord_ApacheAvro.get(this.payloadRttTrackingField);
                }
            }
            if (l != null) {
                this.payloadRttHistogram.update(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - l.longValue()));
            }
        }
        if (this.endToEndStartingTimeSource != EndToEndStartingTimeSource.NONE) {
            long j = 0;
            switch (this.endToEndStartingTimeSource) {
                case MESSAGE_PUBLISH_TIME:
                    j = message.getPublishTime();
                    break;
                case MESSAGE_EVENT_TIME:
                    j = message.getEventTime();
                    break;
                case MESSAGE_PROPERTY_E2E_STARTING_TIME:
                    String property = message.getProperty("e2e_starting_time");
                    j = property != null ? Long.parseLong(property) : 0L;
                    break;
            }
            if (j != 0) {
                this.e2eMsgProcLatencyHistogram.update(System.currentTimeMillis() - j);
            }
        }
        if (this.seqTracking) {
            checkAndUpdateMessageErrorCounter(message);
        }
        int length = message.getData().length;
        this.bytesCounter.inc(length);
        this.messageSizeHistogram.update(length);
    }

    private org.apache.avro.Schema getSchemaFromConfiguration() {
        String schemaDefinition = this.pulsarSchema.getSchemaInfo().getSchemaDefinition();
        if (this.avroSchema == null) {
            this.avroSchema = AvroUtil.GetSchema_ApacheAvro(schemaDefinition);
        }
        return this.avroSchema;
    }
}
