package com.wu.framework.easy.pulsar.listener;

import com.wu.framework.easy.listener.core.ListenerConsumer;
import com.wu.framework.easy.listener.core.SingletonMessageListenerContainer;
import com.wu.framework.easy.listener.core.consumer.ConsumerRecord;
import com.wu.framework.easy.listener.core.consumer.ConsumerRecords;
import com.wu.framework.easy.listener.core.support.Acknowledgment;
import com.wu.framework.easy.pulsar.config.MethodPulsarListenerEndpoint;
import com.wu.framework.easy.pulsar.consumer.PulsarConsumerRecord;
import com.wu.framework.easy.pulsar.consumer.PulsarConsumerRecords;
import com.wu.framework.easy.pulsar.serialization.PulsarRecordSerialization;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:com/wu/framework/easy/pulsar/listener/PulsarSingletonMessageListenerContainer.class */
public class PulsarSingletonMessageListenerContainer implements SingletonMessageListenerContainer {
    private final Logger log = LoggerFactory.getLogger(PulsarSingletonMessageListenerContainer.class);
    private final AsyncListenableTaskExecutor asyncListenableTaskExecutor = new ThreadPoolTaskExecutor();
    protected ListenerConsumer listenerConsumer;
    protected Consumer consumer;
    protected boolean running;
    private String beanName;
    private MethodPulsarListenerEndpoint endpoint;

    /* loaded from: input_file:com/wu/framework/easy/pulsar/listener/PulsarSingletonMessageListenerContainer$ConsumerAcknowledgment.class */
    private final class ConsumerAcknowledgment implements Acknowledgment {
        protected Consumer consumer;
        protected PulsarConsumerRecord record;

        public ConsumerAcknowledgment(Consumer consumer, PulsarConsumerRecord pulsarConsumerRecord) {
            this.consumer = consumer;
            this.record = pulsarConsumerRecord;
        }

        public void acknowledge() {
            try {
                this.consumer.acknowledge(this.record.message());
            } catch (PulsarClientException e) {
                e.printStackTrace();
                PulsarSingletonMessageListenerContainer.this.log.error("ack 消费数据失败" + e.getMessage());
            }
        }
    }

    /* loaded from: input_file:com/wu/framework/easy/pulsar/listener/PulsarSingletonMessageListenerContainer$ConsumerBatchAcknowledgment.class */
    private final class ConsumerBatchAcknowledgment implements Acknowledgment {
        protected Consumer consumer;
        protected PulsarConsumerRecords records;

        public ConsumerBatchAcknowledgment(Consumer consumer, PulsarConsumerRecords pulsarConsumerRecords) {
            this.consumer = consumer;
            this.records = pulsarConsumerRecords;
        }

        public void acknowledge() {
            try {
                this.consumer.acknowledge(this.records.records());
            } catch (PulsarClientException e) {
                e.printStackTrace();
                PulsarSingletonMessageListenerContainer.this.log.error("ack 消费数据失败" + e.getMessage());
            }
        }
    }

    /* loaded from: input_file:com/wu/framework/easy/pulsar/listener/PulsarSingletonMessageListenerContainer$PulsarListenerConsumer.class */
    final class PulsarListenerConsumer implements ListenerConsumer {
        private final PulsarRecordSerialization serialization = new PulsarRecordSerialization();

        PulsarListenerConsumer() {
        }

        public boolean isLongLived() {
            return false;
        }

        public void run() {
            Consumer consumer = PulsarSingletonMessageListenerContainer.this.getConsumer();
            MethodPulsarListenerEndpoint endpoint = PulsarSingletonMessageListenerContainer.this.getEndpoint();
            Class[] parameterTypes = endpoint.getMethod().getParameterTypes();
            Object bean = endpoint.getBean();
            while (PulsarSingletonMessageListenerContainer.this.isRunning()) {
                try {
                } catch (PulsarClientException e) {
                    e.printStackTrace();
                }
                if (!consumer.isConnected()) {
                    return;
                }
                if (ConsumerRecord.class.isAssignableFrom(parameterTypes[0])) {
                    Message receive = consumer.receive();
                    if (!ObjectUtils.isEmpty(receive)) {
                        PulsarConsumerRecord pulsarConsumerRecord = new PulsarConsumerRecord(null, new String(receive.getData()), receive);
                        try {
                            endpoint.getMethod().invoke(bean, invokeArgs(parameterTypes, new Object[]{pulsarConsumerRecord, new ConsumerAcknowledgment(consumer, pulsarConsumerRecord)}));
                        } catch (IllegalAccessException | InvocationTargetException e2) {
                            e2.printStackTrace();
                        }
                    }
                }
                if (ConsumerRecords.class.isAssignableFrom(parameterTypes[0])) {
                    Messages batchReceive = consumer.batchReceive();
                    if (batchReceive.iterator().hasNext()) {
                        try {
                            PulsarConsumerRecords pulsarConsumerRecords = new PulsarConsumerRecords(batchReceive);
                            Stream stream = Arrays.stream(parameterTypes);
                            Class<Acknowledgment> cls = Acknowledgment.class;
                            Objects.requireNonNull(Acknowledgment.class);
                            if (stream.anyMatch(cls::isAssignableFrom)) {
                                endpoint.getMethod().invoke(bean, pulsarConsumerRecords, new ConsumerBatchAcknowledgment(consumer, pulsarConsumerRecords));
                            } else {
                                endpoint.getMethod().invoke(bean, pulsarConsumerRecords);
                            }
                        } catch (IllegalAccessException | InvocationTargetException e3) {
                            e3.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    public void start() {
        this.listenerConsumer = new PulsarListenerConsumer();
        setRunning(true);
        new SimpleAsyncTaskExecutor(getBeanName() + "-C-").submitListenable(this.listenerConsumer);
    }

    public void stop() {
        try {
            getConsumer().close();
        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public void setRunning(boolean z) {
        this.running = z;
    }

    private String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public Consumer getConsumer() {
        return this.consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

    public MethodPulsarListenerEndpoint getEndpoint() {
        return this.endpoint;
    }

    public void setEndpoint(MethodPulsarListenerEndpoint methodPulsarListenerEndpoint) {
        this.endpoint = methodPulsarListenerEndpoint;
    }
}
