package io.nosqlbench.driver.pulsar.ops;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverParamException;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnexpectedException;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;

/* loaded from: input_file:io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.class */
public class PulsarProducerOp implements PulsarOp {
    private static final Logger logger = LogManager.getLogger(PulsarProducerOp.class);
    private final PulsarActivity pulsarActivity;
    private final boolean asyncPulsarOp;
    private final boolean useTransaction;
    private final Supplier<Transaction> transactionSupplier;
    private final Producer<?> producer;
    private final Schema<?> pulsarSchema;
    private final String msgKey;
    private final Map<String, String> msgProperties;
    private final String msgPayload;
    private final Counter bytesCounter;
    private final Histogram messageSizeHistogram;
    private final Timer transactionCommitTimer;
    private org.apache.avro.Schema avroSchema;
    private org.apache.avro.Schema avroKeySchema;

    public PulsarProducerOp(PulsarActivity pulsarActivity, boolean z, boolean z2, Supplier<Transaction> supplier, Producer<?> producer, Schema<?> schema, String str, Map<String, String> map, String str2) {
        this.pulsarActivity = pulsarActivity;
        this.asyncPulsarOp = z;
        this.useTransaction = z2;
        this.transactionSupplier = supplier;
        this.producer = producer;
        this.pulsarSchema = schema;
        this.msgKey = str;
        this.msgProperties = map;
        this.msgPayload = str2;
        this.bytesCounter = pulsarActivity.getBytesCounter();
        this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
        this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer();
    }

    @Override // io.nosqlbench.driver.pulsar.ops.PulsarOp
    public void run(Runnable runnable) {
        Transaction transaction;
        TypedMessageBuilder newMessage;
        TypedMessageBuilder value;
        int length;
        if (StringUtils.isBlank(this.msgPayload)) {
            throw new PulsarDriverParamException("Message payload (\"msg-value\") can't be empty!");
        }
        if (this.useTransaction) {
            transaction = this.transactionSupplier.get();
            newMessage = this.producer.newMessage(transaction);
        } else {
            transaction = null;
            newMessage = this.producer.newMessage(this.pulsarSchema);
        }
        if (!StringUtils.isBlank(this.msgKey)) {
            newMessage = newMessage.key(this.msgKey);
        }
        if (!this.msgProperties.isEmpty()) {
            newMessage = newMessage.properties(this.msgProperties);
        }
        SchemaType type = this.pulsarSchema.getSchemaInfo().getType();
        if (this.pulsarSchema instanceof KeyValueSchema) {
            int indexOf = this.msgPayload.indexOf("}||{");
            if (indexOf < 0) {
                throw new IllegalArgumentException("KeyValue payload MUST be in form {KEY IN JSON}||{VALUE IN JSON} (with 2 pipes that separate the KEY part from the VALUE part)");
            }
            String substring = this.msgPayload.substring(0, indexOf + 1);
            String substring2 = this.msgPayload.substring(indexOf + 3);
            KeyValueSchema keyValueSchema = this.pulsarSchema;
            value = newMessage.value(new KeyValue(AvroUtil.GetGenericRecord_PulsarAvro(keyValueSchema.getKeySchema(), getKeyAvroSchemaFromConfiguration(), substring), AvroUtil.GetGenericRecord_PulsarAvro(keyValueSchema.getValueSchema(), getAvroSchemaFromConfiguration(), substring2)));
            length = this.msgPayload.length();
        } else if (PulsarActivityUtil.isAvroSchemaTypeStr(type.name())) {
            value = newMessage.value(AvroUtil.GetGenericRecord_PulsarAvro(this.pulsarSchema, this.pulsarSchema.getSchemaInfo().getSchemaDefinition(), this.msgPayload));
            length = this.msgPayload.length();
        } else {
            byte[] bytes = this.msgPayload.getBytes(StandardCharsets.UTF_8);
            value = newMessage.value(bytes);
            length = bytes.length;
        }
        this.messageSizeHistogram.update(length);
        this.bytesCounter.inc(length);
        if (this.asyncPulsarOp) {
            try {
                CompletableFuture sendAsync = value.sendAsync();
                if (this.useTransaction) {
                    Transaction transaction2 = transaction;
                    sendAsync = sendAsync.thenCompose(obj -> {
                        Timer.Context time = this.transactionCommitTimer.time();
                        return transaction2.commit().whenComplete((r3, th) -> {
                            time.close();
                        }).thenApply(r32 -> {
                            return obj;
                        });
                    });
                }
                sendAsync.whenComplete((obj2, th) -> {
                    if (logger.isDebugEnabled()) {
                        if (PulsarActivityUtil.isAvroSchemaTypeStr(type.name())) {
                            logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={})", this.producer.getProducerName(), this.msgKey, this.msgProperties, AvroUtil.GetGenericRecord_ApacheAvro(getAvroSchemaFromConfiguration(), this.msgPayload).toString());
                        } else {
                            logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={}", this.producer.getProducerName(), this.msgKey, this.msgProperties, this.msgPayload);
                        }
                    }
                    runnable.run();
                }).exceptionally(th2 -> {
                    logger.error("Async message sending failed: key - " + this.msgKey + "; properties - " + this.msgProperties + "; payload - " + this.msgPayload);
                    this.pulsarActivity.asyncOperationFailed(th2);
                    return null;
                });
                return;
            } catch (Exception e) {
                throw new PulsarDriverUnexpectedException(e);
            }
        }
        try {
            logger.trace("Sending message");
            value.send();
            if (this.useTransaction) {
                Timer.Context time = this.transactionCommitTimer.time();
                try {
                    transaction.commit().get();
                    if (time != null) {
                        time.close();
                    }
                } finally {
                }
            }
            if (logger.isDebugEnabled()) {
                if (PulsarActivityUtil.isAvroSchemaTypeStr(type.name())) {
                    logger.debug("({}) Sync message sent: msg-key={}; msg-properties={}; msg-payload={})", this.producer.getProducerName(), this.msgKey, this.msgProperties, AvroUtil.GetGenericRecord_ApacheAvro(getAvroSchemaFromConfiguration(), this.msgPayload).toString());
                } else {
                    logger.debug("({}) Sync message sent; msg-key={}; msg-properties={}; msg-payload={}", this.producer.getProducerName(), this.msgKey, this.msgProperties, this.msgPayload);
                }
            }
            runnable.run();
        } catch (PulsarClientException | InterruptedException | ExecutionException e2) {
            String str = "Sync message sending failed: key - " + this.msgKey + "; properties - " + this.msgProperties + "; payload - " + this.msgPayload;
            logger.trace(str);
            throw new PulsarDriverUnexpectedException(str);
        }
    }

    private org.apache.avro.Schema getAvroSchemaFromConfiguration() {
        if (this.avroSchema == null) {
            if (this.pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
                this.avroSchema = AvroUtil.GetSchema_ApacheAvro(this.pulsarSchema.getValueSchema().getSchemaInfo().getSchemaDefinition());
            } else {
                this.avroSchema = AvroUtil.GetSchema_ApacheAvro(this.pulsarSchema.getSchemaInfo().getSchemaDefinition());
            }
        }
        return this.avroSchema;
    }

    private org.apache.avro.Schema getKeyAvroSchemaFromConfiguration() {
        if (this.avroKeySchema == null) {
            if (this.pulsarSchema.getSchemaInfo().getType() != SchemaType.KEY_VALUE) {
                throw new RuntimeException("We are not using KEY_VALUE schema, so no Schema for the Key!");
            }
            this.avroKeySchema = AvroUtil.GetSchema_ApacheAvro(this.pulsarSchema.getKeySchema().getSchemaInfo().getSchemaDefinition());
        }
        return this.avroKeySchema;
    }
}
