package io.nosqlbench.driver.pulsar;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;

/* loaded from: input_file:io/nosqlbench/driver/pulsar/PulsarSpace.class */
public class PulsarSpace {
    private static final Logger logger = LogManager.getLogger(PulsarSpace.class);
    private final String spaceName;
    private final PulsarActivity pulsarActivity;
    private final ActivityDef activityDef;
    private final PulsarNBClientConf pulsarNBClientConf;
    private final String pulsarSvcUrl;
    private final String webSvcUrl;
    private final PulsarAdmin pulsarAdmin;
    private final PulsarClient pulsarClient;
    private final Schema<?> pulsarSchema;
    private final Timer createTransactionTimer;
    private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
    private final Set<String> pulsarClusterMetadata = new HashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/nosqlbench/driver/pulsar/PulsarSpace$ConsumerGaugeImpl.class */
    public static class ConsumerGaugeImpl implements Gauge<Object> {
        private final Consumer<?> consumer;
        private final Function<ConsumerStats, Object> valueExtractor;

        ConsumerGaugeImpl(Consumer<?> consumer, Function<ConsumerStats, Object> function) {
            this.consumer = consumer;
            this.valueExtractor = function;
        }

        public Object getValue() {
            Object apply;
            synchronized (this.consumer) {
                apply = this.valueExtractor.apply(this.consumer.getStats());
            }
            return apply;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/nosqlbench/driver/pulsar/PulsarSpace$ProducerGaugeImpl.class */
    public static class ProducerGaugeImpl implements Gauge<Object> {
        private final Producer<?> producer;
        private final Function<ProducerStats, Object> valueExtractor;

        ProducerGaugeImpl(Producer<?> producer, Function<ProducerStats, Object> function) {
            this.producer = producer;
            this.valueExtractor = function;
        }

        public Object getValue() {
            Object apply;
            synchronized (this.producer) {
                apply = this.valueExtractor.apply(this.producer.getStats());
            }
            return apply;
        }
    }

    public PulsarSpace(String str, PulsarActivity pulsarActivity) {
        this.spaceName = str;
        this.pulsarActivity = pulsarActivity;
        this.pulsarNBClientConf = pulsarActivity.getPulsarConf();
        this.pulsarSvcUrl = pulsarActivity.getPulsarSvcUrl();
        this.webSvcUrl = pulsarActivity.getWebSvcUrl();
        this.pulsarAdmin = pulsarActivity.getPulsarAdmin();
        this.pulsarClient = pulsarActivity.getPulsarClient();
        this.pulsarSchema = pulsarActivity.getPulsarSchema();
        this.activityDef = pulsarActivity.getActivityDef();
        this.createTransactionTimer = pulsarActivity.getCreateTransactionTimer();
        try {
            CollectionUtils.addAll(this.pulsarClusterMetadata, this.pulsarAdmin.clusters().getClusters().listIterator());
        } catch (PulsarAdminException e) {
            logger.info("Could not get list of Pulsar Clusters from global configuration: " + e.getMessage());
        }
    }

    public PulsarNBClientConf getPulsarClientConf() {
        return this.pulsarNBClientConf;
    }

    public PulsarAdmin getPulsarAdmin() {
        return this.pulsarAdmin;
    }

    public PulsarClient getPulsarClient() {
        return this.pulsarClient;
    }

    public Schema<?> getPulsarSchema() {
        return this.pulsarSchema;
    }

    public String getPulsarSvcUrl() {
        return this.pulsarSvcUrl;
    }

    public String getWebSvcUrl() {
        return this.webSvcUrl;
    }

    public Set<String> getPulsarClusterMetadata() {
        return this.pulsarClusterMetadata;
    }

    public void shutdownPulsarSpace() {
        try {
            for (Producer<?> producer : this.producers.values()) {
                if (producer != null) {
                    producer.close();
                }
            }
            for (Consumer<?> consumer : this.consumers.values()) {
                if (consumer != null) {
                    consumer.close();
                }
            }
            for (Reader<?> reader : this.readers.values()) {
                if (reader != null) {
                    reader.close();
                }
            }
            if (this.pulsarAdmin != null) {
                this.pulsarAdmin.close();
            }
            if (this.pulsarClient != null) {
                this.pulsarClient.close();
            }
        } catch (Exception e) {
            throw new RuntimeException("Unexpected error when closing Pulsar objects!");
        }
    }

    private String getPulsarAPIMetricsPrefix(String str, String str2, String str3) {
        String str4;
        if (!PulsarActivityUtil.isValidPulsarApiType(str)) {
            throw new RuntimeException("Incorrect Pulsar API type. Valid type list: " + PulsarActivityUtil.getValidPulsarApiTypeList());
        }
        if (StringUtils.isBlank(str2)) {
            String str5 = str;
            if (str.equalsIgnoreCase(PulsarActivityUtil.PULSAR_API_TYPE.PRODUCER.label)) {
                str5 = str5 + this.producers.size();
            } else if (str.equalsIgnoreCase(PulsarActivityUtil.PULSAR_API_TYPE.CONSUMER.label)) {
                str5 = str5 + this.consumers.size();
            } else if (str.equalsIgnoreCase(PulsarActivityUtil.PULSAR_API_TYPE.READER.label)) {
                str5 = str5 + this.readers.size();
            }
            str4 = str5 + "_";
        } else {
            str4 = str2 + "_";
        }
        return (str4 + str3 + "_").replace("persistent://public/default/", "").replace("non-persistent://", "").replace("persistent://", "").replace("/", "_");
    }

    static Gauge<Object> producerSafeExtractMetric(Producer<?> producer, Function<ProducerStats, Object> function) {
        return new ProducerGaugeImpl(producer, function);
    }

    private String getEffectiveProducerName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String producerName = this.pulsarNBClientConf.getProducerName();
        return !StringUtils.isBlank(producerName) ? producerName : "";
    }

    public Supplier<Transaction> getTransactionSupplier() {
        PulsarClient pulsarClient = getPulsarClient();
        return () -> {
            try {
                try {
                    Timer.Context time = this.createTransactionTimer.time();
                    try {
                        Transaction transaction = (Transaction) pulsarClient.newTransaction().build().get();
                        if (time != null) {
                            time.close();
                        }
                        return transaction;
                    } catch (Throwable th) {
                        if (time != null) {
                            try {
                                time.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (InterruptedException | ExecutionException e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Error while starting a new transaction", e);
                    }
                    throw new RuntimeException(e);
                }
            } catch (PulsarClientException e2) {
                throw new RuntimeException("Transactions are not enabled on Pulsar Client, please set client.enableTransaction=true in your Pulsar Client configuration");
            }
        };
    }

    private String getEffectiveProducerTopicName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String producerTopicName = this.pulsarNBClientConf.getProducerTopicName();
        if (StringUtils.isBlank(producerTopicName)) {
            throw new RuntimeException("Producer topic name must be set at either global level or cycle level!");
        }
        return producerTopicName;
    }

    public Producer<?> getProducer(String str, String str2) {
        String effectiveProducerTopicName = getEffectiveProducerTopicName(str);
        String effectiveProducerName = getEffectiveProducerName(str2);
        if (StringUtils.isBlank(effectiveProducerTopicName)) {
            throw new RuntimeException("Producer:: must specify a topic name");
        }
        String buildCacheKey = PulsarActivityUtil.buildCacheKey(effectiveProducerName, effectiveProducerTopicName);
        Producer<?> producer = this.producers.get(buildCacheKey);
        if (producer == null) {
            PulsarClient pulsarClient = getPulsarClient();
            Map<String, Object> producerConfMap = this.pulsarNBClientConf.getProducerConfMap();
            producerConfMap.remove(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label);
            producerConfMap.remove(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label);
            String pulsarAPIMetricsPrefix = getPulsarAPIMetricsPrefix(PulsarActivityUtil.PULSAR_API_TYPE.PRODUCER.label, effectiveProducerName, effectiveProducerTopicName);
            try {
                ProducerBuilder producerBuilder = pulsarClient.newProducer(this.pulsarSchema).loadConf(producerConfMap).topic(effectiveProducerTopicName);
                if (!StringUtils.isAnyBlank(new CharSequence[]{effectiveProducerName})) {
                    producerBuilder = producerBuilder.producerName(effectiveProducerName);
                }
                producer = producerBuilder.create();
                this.producers.put(buildCacheKey, producer);
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "total_bytes_sent", producerSafeExtractMetric(producer, producerStats -> {
                    return Long.valueOf(producerStats.getTotalBytesSent() + producerStats.getNumBytesSent());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "total_msg_sent", producerSafeExtractMetric(producer, producerStats2 -> {
                    return Long.valueOf(producerStats2.getTotalMsgsSent() + producerStats2.getNumMsgsSent());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "total_send_failed", producerSafeExtractMetric(producer, producerStats3 -> {
                    return Long.valueOf(producerStats3.getTotalSendFailed() + producerStats3.getNumSendFailed());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "total_ack_received", producerSafeExtractMetric(producer, producerStats4 -> {
                    return Long.valueOf(producerStats4.getTotalAcksReceived() + producerStats4.getNumAcksReceived());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "send_bytes_rate", producerSafeExtractMetric(producer, (v0) -> {
                    return v0.getSendBytesRate();
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "send_msg_rate", producerSafeExtractMetric(producer, (v0) -> {
                    return v0.getSendMsgsRate();
                }));
            } catch (PulsarClientException e) {
                throw new RuntimeException("Unable to create a Pulsar producer!", e);
            }
        }
        return producer;
    }

    static Gauge<Object> consumerSafeExtractMetric(Consumer<?> consumer, Function<ConsumerStats, Object> function) {
        return new ConsumerGaugeImpl(consumer, function);
    }

    private String getEffectiveSubscriptionName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerSubscriptionName = this.pulsarNBClientConf.getConsumerSubscriptionName();
        if (StringUtils.isBlank(consumerSubscriptionName)) {
            throw new RuntimeException("Consumer::Subscription name must be set at either global level or cycle level!");
        }
        return consumerSubscriptionName;
    }

    private String getEffectiveSubscriptionTypeStr(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerSubscriptionType = this.pulsarNBClientConf.getConsumerSubscriptionType();
        return !StringUtils.isBlank(consumerSubscriptionType) ? consumerSubscriptionType : "";
    }

    private SubscriptionType getEffectiveSubscriptionType(String str) {
        String effectiveSubscriptionTypeStr = getEffectiveSubscriptionTypeStr(str);
        SubscriptionType subscriptionType = SubscriptionType.Exclusive;
        if (!StringUtils.isBlank(effectiveSubscriptionTypeStr)) {
            if (!PulsarActivityUtil.isValidSubscriptionType(effectiveSubscriptionTypeStr)) {
                throw new RuntimeException("Consumer::Invalid subscription type (\"" + effectiveSubscriptionTypeStr + "\"). \nValid subscription types: " + PulsarActivityUtil.getValidSubscriptionTypeList());
            }
            subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionTypeStr);
        }
        return subscriptionType;
    }

    private String getEffectiveConsumerName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerName = this.pulsarNBClientConf.getConsumerName();
        return !StringUtils.isBlank(consumerName) ? consumerName : "";
    }

    public Consumer<?> getConsumer(String str, String str2, String str3, String str4, String str5) {
        String effectiveSubscriptionName = getEffectiveSubscriptionName(str2);
        SubscriptionType effectiveSubscriptionType = getEffectiveSubscriptionType(str3);
        String effectiveConsumerName = getEffectiveConsumerName(str4);
        if (StringUtils.isAnyBlank(new CharSequence[]{str, effectiveSubscriptionName})) {
            throw new RuntimeException("Consumer:: must specify a topic name and a subscription name");
        }
        return this.consumers.computeIfAbsent(PulsarActivityUtil.buildCacheKey(effectiveConsumerName, effectiveSubscriptionName, str), str6 -> {
            PulsarClient pulsarClient = getPulsarClient();
            HashMap hashMap = new HashMap(this.pulsarNBClientConf.getConsumerConfMap());
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label);
            try {
                ConsumerBuilder subscriptionType = pulsarClient.newConsumer(this.pulsarSchema).loadConf(hashMap).topic(new String[]{str}).subscriptionName(effectiveSubscriptionName).subscriptionType(effectiveSubscriptionType);
                if (effectiveSubscriptionType == SubscriptionType.Key_Shared) {
                    KeySharedPolicy.KeySharedPolicySticky autoSplitHashRange = KeySharedPolicy.autoSplitHashRange();
                    if (str5 != null && !str5.isEmpty()) {
                        Range[] parseRanges = parseRanges(str5);
                        logger.info("Configuring KeySharedPolicy#stickyHashRange with ranges {}", parseRanges);
                        autoSplitHashRange = KeySharedPolicy.stickyHashRange().ranges(parseRanges);
                    }
                    subscriptionType.keySharedPolicy(autoSplitHashRange);
                }
                if (!StringUtils.isBlank(effectiveConsumerName)) {
                    subscriptionType = subscriptionType.consumerName(effectiveConsumerName);
                }
                Consumer subscribe = subscriptionType.subscribe();
                String pulsarAPIMetricsPrefix = getPulsarAPIMetricsPrefix(PulsarActivityUtil.PULSAR_API_TYPE.CONSUMER.label, effectiveConsumerName, str);
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "total_bytes_recv", consumerSafeExtractMetric(subscribe, consumerStats -> {
                    return Long.valueOf(consumerStats.getTotalBytesReceived() + consumerStats.getNumBytesReceived());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "total_msg_recv", consumerSafeExtractMetric(subscribe, consumerStats2 -> {
                    return Long.valueOf(consumerStats2.getTotalMsgsReceived() + consumerStats2.getNumMsgsReceived());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "total_recv_failed", consumerSafeExtractMetric(subscribe, consumerStats3 -> {
                    return Long.valueOf(consumerStats3.getTotalReceivedFailed() + consumerStats3.getNumReceiveFailed());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "total_acks_sent", consumerSafeExtractMetric(subscribe, consumerStats4 -> {
                    return Long.valueOf(consumerStats4.getTotalAcksSent() + consumerStats4.getNumAcksSent());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "recv_bytes_rate", consumerSafeExtractMetric(subscribe, (v0) -> {
                    return v0.getRateBytesReceived();
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "recv_msg_rate", consumerSafeExtractMetric(subscribe, (v0) -> {
                    return v0.getRateMsgsReceived();
                }));
                return subscribe;
            } catch (PulsarClientException e) {
                logger.error("Error creating a consumer", e);
                throw new RuntimeException("Unable to create a Pulsar consumer!");
            }
        });
    }

    private static Range[] parseRanges(String str) {
        if (str == null || str.isEmpty()) {
            return new Range[0];
        }
        String[] split = str.split(",");
        Range[] rangeArr = new Range[split.length];
        for (int i = 0; i < split.length; i++) {
            String str2 = split[i];
            int indexOf = str2.indexOf("..");
            if (indexOf <= 0) {
                throw new IllegalArgumentException("Invalid range '" + str2 + "'");
            }
            try {
                rangeArr[i] = Range.of(Integer.parseInt(str2.substring(0, indexOf)), Integer.parseInt(str2.substring(indexOf + 2)));
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid range '" + str2 + "'");
            }
        }
        return rangeArr;
    }

    private String getEffectiveConsumerTopicNameListStr(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerTopicNames = this.pulsarNBClientConf.getConsumerTopicNames();
        return !StringUtils.isBlank(consumerTopicNames) ? consumerTopicNames : "";
    }

    private List<String> getEffectiveConsumerTopicNameList(String str) {
        String[] split = getEffectiveConsumerTopicNameListStr(str).split("[;,]");
        ArrayList arrayList = new ArrayList();
        for (String str2 : split) {
            if (!StringUtils.isBlank(str2)) {
                arrayList.add(str2.trim());
            }
        }
        return arrayList;
    }

    private String getEffectiveConsumerTopicPatternStr(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerTopicPattern = this.pulsarNBClientConf.getConsumerTopicPattern();
        return !StringUtils.isBlank(consumerTopicPattern) ? consumerTopicPattern : "";
    }

    private Pattern getEffectiveConsumerTopicPattern(String str) {
        Pattern pattern;
        String effectiveConsumerTopicPatternStr = getEffectiveConsumerTopicPatternStr(str);
        try {
            pattern = !StringUtils.isBlank(effectiveConsumerTopicPatternStr) ? Pattern.compile(effectiveConsumerTopicPatternStr) : null;
        } catch (PatternSyntaxException e) {
            pattern = null;
        }
        return pattern;
    }

    public Consumer<?> getMultiTopicConsumer(String str, String str2, String str3, String str4, String str5, String str6) {
        List<String> effectiveConsumerTopicNameList = getEffectiveConsumerTopicNameList(str2);
        String effectiveConsumerTopicPatternStr = getEffectiveConsumerTopicPatternStr(str3);
        Pattern effectiveConsumerTopicPattern = getEffectiveConsumerTopicPattern(str3);
        String effectiveSubscriptionName = getEffectiveSubscriptionName(str4);
        SubscriptionType effectiveSubscriptionType = getEffectiveSubscriptionType(str5);
        String effectiveConsumerName = getEffectiveConsumerName(str6);
        if (effectiveSubscriptionType.equals(SubscriptionType.Exclusive) && this.activityDef.getThreads() > 1) {
            throw new RuntimeException("Consumer:: trying to create multiple consumers of \"Exclusive\" subscription type under the same subscription name to the same topic!");
        }
        if (StringUtils.isBlank(str) && effectiveConsumerTopicNameList.isEmpty() && effectiveConsumerTopicPattern == null) {
            throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!");
        }
        String join = !effectiveConsumerTopicNameList.isEmpty() ? String.join("|", effectiveConsumerTopicNameList) : effectiveConsumerTopicPattern != null ? effectiveConsumerTopicPatternStr : str;
        String buildCacheKey = PulsarActivityUtil.buildCacheKey(effectiveConsumerName, effectiveSubscriptionName, join);
        Consumer<?> consumer = this.consumers.get(buildCacheKey);
        if (consumer == null) {
            PulsarClient pulsarClient = getPulsarClient();
            HashMap hashMap = new HashMap(this.pulsarNBClientConf.getConsumerConfMap());
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label);
            hashMap.remove(PulsarActivityUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label);
            try {
                ConsumerBuilder consumerName = pulsarClient.newConsumer(this.pulsarSchema).loadConf(hashMap).subscriptionName(effectiveSubscriptionName).subscriptionType(effectiveSubscriptionType).consumerName(effectiveConsumerName);
                consumer = (!effectiveConsumerTopicNameList.isEmpty() ? consumerName.topics(effectiveConsumerTopicNameList) : effectiveConsumerTopicPattern != null ? consumerName.topicsPattern(effectiveConsumerTopicPattern) : consumerName.topic(new String[]{str})).subscribe();
                String pulsarAPIMetricsPrefix = getPulsarAPIMetricsPrefix(PulsarActivityUtil.PULSAR_API_TYPE.PRODUCER.label, effectiveConsumerName, join);
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "totalBytesRecvd", consumerSafeExtractMetric(consumer, consumerStats -> {
                    return Long.valueOf(consumerStats.getTotalBytesReceived() + consumerStats.getNumBytesReceived());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "totalMsgsRecvd", consumerSafeExtractMetric(consumer, consumerStats2 -> {
                    return Long.valueOf(consumerStats2.getTotalMsgsReceived() + consumerStats2.getNumMsgsReceived());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "totalRecvdFailed", consumerSafeExtractMetric(consumer, consumerStats3 -> {
                    return Long.valueOf(consumerStats3.getTotalReceivedFailed() + consumerStats3.getNumReceiveFailed());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "totalAcksSent", consumerSafeExtractMetric(consumer, consumerStats4 -> {
                    return Long.valueOf(consumerStats4.getTotalAcksSent() + consumerStats4.getNumAcksSent());
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "recvdBytesRate", consumerSafeExtractMetric(consumer, (v0) -> {
                    return v0.getRateBytesReceived();
                }));
                ActivityMetrics.gauge(this.activityDef, pulsarAPIMetricsPrefix + "recvdMsgsRate", consumerSafeExtractMetric(consumer, (v0) -> {
                    return v0.getRateMsgsReceived();
                }));
                this.consumers.put(buildCacheKey, consumer);
            } catch (PulsarClientException e) {
                e.printStackTrace();
                throw new RuntimeException("Unable to create a Pulsar consumer!");
            }
        }
        return consumer;
    }

    private String getEffectiveReaderTopicName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String readerTopicName = this.pulsarNBClientConf.getReaderTopicName();
        if (StringUtils.isBlank(readerTopicName)) {
            throw new RuntimeException("Reader:: Reader topic name must be set at either global level or cycle level!");
        }
        return readerTopicName;
    }

    private String getEffectiveReaderName(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String consumerName = this.pulsarNBClientConf.getConsumerName();
        return !StringUtils.isBlank(consumerName) ? consumerName : "";
    }

    private String getEffectiveStartMsgPosStr(String str) {
        if (!StringUtils.isBlank(str)) {
            return str;
        }
        String startMsgPosStr = this.pulsarNBClientConf.getStartMsgPosStr();
        return !StringUtils.isBlank(startMsgPosStr) ? startMsgPosStr : PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label;
    }

    public Reader<?> getReader(String str, String str2, String str3) {
        String effectiveReaderTopicName = getEffectiveReaderTopicName(str);
        String effectiveReaderName = getEffectiveReaderName(str2);
        String effectiveStartMsgPosStr = getEffectiveStartMsgPosStr(str3);
        if (!PulsarActivityUtil.isValideReaderStartPosition(effectiveStartMsgPosStr)) {
            throw new RuntimeException("Reader:: Invalid value for reader start message position!");
        }
        String buildCacheKey = PulsarActivityUtil.buildCacheKey(effectiveReaderTopicName, effectiveReaderName, effectiveStartMsgPosStr);
        Reader<?> reader = this.readers.get(buildCacheKey);
        if (reader == null) {
            PulsarClient pulsarClient = getPulsarClient();
            Map<String, Object> readerConfMap = this.pulsarNBClientConf.getReaderConfMap();
            readerConfMap.remove(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.label);
            readerConfMap.remove(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.label);
            readerConfMap.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
            try {
                ReaderBuilder readerName = pulsarClient.newReader(this.pulsarSchema).loadConf(readerConfMap).topic(effectiveReaderTopicName).readerName(effectiveReaderName);
                MessageId messageId = MessageId.latest;
                if (effectiveStartMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) {
                    messageId = MessageId.earliest;
                }
                reader = readerName.startMessageId(messageId).create();
                this.readers.put(buildCacheKey, reader);
            } catch (PulsarClientException e) {
                e.printStackTrace();
                throw new RuntimeException("Unable to create a Pulsar reader!");
            }
        }
        return reader;
    }
}
