package com.google.cloud.spring.pubsub.reactive;

import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberOperations;
import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-gcp-pubsub-2.0.6.jar:com/google/cloud/spring/pubsub/reactive/PubSubReactiveFactory.class */
public final class PubSubReactiveFactory {
    private static final Log LOGGER = LogFactory.getLog(PubSubReactiveFactory.class);
    private final PubSubSubscriberOperations subscriberOperations;
    private final Scheduler scheduler;
    private final int maxMessages;

    public PubSubReactiveFactory(PubSubSubscriberOperations pubSubSubscriberOperations, Scheduler scheduler) {
        this(pubSubSubscriberOperations, scheduler, Integer.MAX_VALUE);
    }

    public PubSubReactiveFactory(PubSubSubscriberOperations pubSubSubscriberOperations, Scheduler scheduler, int i) {
        Assert.notNull(pubSubSubscriberOperations, "subscriberOperations cannot be null.");
        Assert.notNull(scheduler, "scheduler cannot be null.");
        if (i < 1) {
            throw new IllegalArgumentException("maxMessages cannot be less than 1.");
        }
        this.subscriberOperations = pubSubSubscriberOperations;
        this.scheduler = scheduler;
        this.maxMessages = i;
    }

    public Flux<AcknowledgeablePubsubMessage> poll(String str, long j) {
        return Flux.create(fluxSink -> {
            fluxSink.onRequest(j2 -> {
                if (j2 == Long.MAX_VALUE) {
                    pollingPull(str, j, fluxSink);
                } else {
                    backpressurePull(str, j2, fluxSink);
                }
            });
        });
    }

    private void pollingPull(String str, long j, FluxSink<AcknowledgeablePubsubMessage> fluxSink) {
        Flux<R> flatMap = Flux.interval(Duration.ZERO, Duration.ofMillis(j), this.scheduler).flatMap(l -> {
            return pullAll(str);
        });
        Objects.requireNonNull(fluxSink);
        Consumer consumer = (v1) -> {
            r1.next(v1);
        };
        Objects.requireNonNull(fluxSink);
        fluxSink.onDispose(flatMap.subscribe(consumer, fluxSink::error));
    }

    private Flux<AcknowledgeablePubsubMessage> pullAll(String str) {
        return Mono.fromFuture(this.subscriberOperations.pullAsync(str, Integer.valueOf(this.maxMessages), true).completable()).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        });
    }

    private void backpressurePull(String str, long j, FluxSink<AcknowledgeablePubsubMessage> fluxSink) {
        this.subscriberOperations.pullAsync(str, Integer.valueOf(j > 2147483647L ? Integer.MAX_VALUE : (int) j), false).addCallback(list -> {
            if (!fluxSink.isCancelled()) {
                Objects.requireNonNull(fluxSink);
                list.forEach((v1) -> {
                    r1.next(v1);
                });
            }
            if (fluxSink.isCancelled()) {
                return;
            }
            long size = j - list.size();
            if (size > 0) {
                backpressurePull(str, size, fluxSink);
            }
        }, th -> {
            if (!(th instanceof DeadlineExceededException)) {
                fluxSink.error(th);
                return;
            }
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Blocking pull timed out due to empty subscription " + str + "; retrying.");
            }
            backpressurePull(str, j, fluxSink);
        });
    }
}
