package org.reactivestreams.tck;

import java.util.HashSet;
import java.util.Set;
import org.reactivestreams.api.Processor;
import org.reactivestreams.spi.Publisher;
import org.reactivestreams.spi.Subscriber;
import org.reactivestreams.spi.Subscription;
import org.reactivestreams.tck.SubscriberVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.Test;

/* loaded from: input_file:org/reactivestreams/tck/IdentityProcessorVerification.class */
public abstract class IdentityProcessorVerification<T> {
    private final TestEnvironment env;
    private final SubscriberVerification<T> subscriberVerification;
    private final PublisherVerification<T> publisherVerification;
    private final int testBufferSize;

    /* loaded from: input_file:org/reactivestreams/tck/IdentityProcessorVerification$ManualSubscriberWithErrorCollection.class */
    private class ManualSubscriberWithErrorCollection<A> extends TestEnvironment.ManualSubscriberWithSubscriptionSupport<A> {
        TestEnvironment.Promise<Throwable> error;

        public ManualSubscriberWithErrorCollection(TestEnvironment testEnvironment) {
            super(testEnvironment);
            this.error = new TestEnvironment.Promise<>(testEnvironment);
        }

        @Override // org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport, org.reactivestreams.tck.TestEnvironment.TestSubscriber
        public void onError(Throwable th) {
            this.error.complete(th);
        }

        public void expectError(Throwable th) throws InterruptedException {
            expectError(th, this.env.defaultTimeoutMillis());
        }

        public void expectError(Throwable th, long j) throws InterruptedException {
            this.error.expectCompletion(j, "Did not receive expected error on downstream");
            if (this.error.value().equals(th)) {
                return;
            }
            this.env.flop("Expected error " + th + " but got " + this.error.value());
        }
    }

    /* loaded from: input_file:org/reactivestreams/tck/IdentityProcessorVerification$TestSetup.class */
    abstract class TestSetup extends TestEnvironment.ManualPublisher<T> {
        private TestEnvironment.ManualSubscriber<T> tees;
        private Set<T> seenTees;
        final Processor<T, T> processor;
        final int testBufferSize;

        public TestSetup(TestEnvironment testEnvironment, int i) throws InterruptedException {
            super(testEnvironment);
            this.seenTees = new HashSet();
            this.testBufferSize = i;
            this.tees = testEnvironment.newManualSubscriber(IdentityProcessorVerification.this.createHelperPublisher(0));
            this.processor = IdentityProcessorVerification.this.createIdentityProcessor(i);
            subscribe(this.processor.getSubscriber());
        }

        public TestEnvironment.ManualSubscriber<T> newSubscriber() throws InterruptedException {
            return this.env.newManualSubscriber(this.processor.getPublisher());
        }

        public T nextT() throws InterruptedException {
            T requestNextElement = this.tees.requestNextElement();
            if (this.seenTees.contains(requestNextElement)) {
                this.env.flop("Helper publisher illegally produced the same element " + requestNextElement + " twice");
            }
            this.seenTees.add(requestNextElement);
            return requestNextElement;
        }

        public void expectNextElement(TestEnvironment.ManualSubscriber<T> manualSubscriber, T t) throws InterruptedException {
            T nextElement = manualSubscriber.nextElement("timeout while awaiting " + t);
            if (nextElement.equals(t)) {
                return;
            }
            this.env.flop("Received `onNext(" + nextElement + ")` on downstream but expected `onNext(" + t + ")`");
        }

        public T sendNextTFromUpstream() throws InterruptedException {
            T t = (T) nextT();
            sendNext(t);
            return t;
        }
    }

    public IdentityProcessorVerification(TestEnvironment testEnvironment, long j) {
        this(testEnvironment, j, 16);
    }

    public IdentityProcessorVerification(TestEnvironment testEnvironment, long j, int i) {
        this.env = testEnvironment;
        this.testBufferSize = i;
        this.subscriberVerification = new SubscriberVerification<T>(testEnvironment) { // from class: org.reactivestreams.tck.IdentityProcessorVerification.1
            @Override // org.reactivestreams.tck.SubscriberVerification
            Subscriber<T> createSubscriber(SubscriberVerification.SubscriberProbe<T> subscriberProbe) {
                return IdentityProcessorVerification.this.createSubscriber(subscriberProbe);
            }

            @Override // org.reactivestreams.tck.SubscriberVerification
            Publisher<T> createHelperPublisher(int i2) {
                return IdentityProcessorVerification.this.createHelperPublisher(i2);
            }
        };
        this.publisherVerification = new PublisherVerification<T>(testEnvironment, j) { // from class: org.reactivestreams.tck.IdentityProcessorVerification.2
            @Override // org.reactivestreams.tck.PublisherVerification
            public Publisher<T> createPublisher(int i2) {
                return IdentityProcessorVerification.this.createPublisher(i2);
            }

            @Override // org.reactivestreams.tck.PublisherVerification
            public Publisher<T> createCompletedStatePublisher() {
                return IdentityProcessorVerification.this.createCompletedStatePublisher();
            }

            @Override // org.reactivestreams.tck.PublisherVerification
            public Publisher<T> createErrorStatePublisher() {
                return IdentityProcessorVerification.this.createErrorStatePublisher();
            }
        };
    }

    public abstract Processor<T, T> createIdentityProcessor(int i);

    public abstract Publisher<T> createHelperPublisher(int i);

    public abstract Publisher<T> createCompletedStatePublisher();

    public abstract Publisher<T> createErrorStatePublisher();

    public Publisher<T> createPublisher(int i) {
        Processor<T, T> createIdentityProcessor = createIdentityProcessor(this.testBufferSize);
        createHelperPublisher(i).subscribe(createIdentityProcessor.getSubscriber());
        return createIdentityProcessor.getPublisher();
    }

    @Test
    public void mustSupportAPendingElementCountUpToLongMaxValue() throws Exception {
        new IdentityProcessorVerification<T>.TestSetup(this.env, this.testBufferSize) { // from class: org.reactivestreams.tck.IdentityProcessorVerification.3
            /* JADX WARN: Multi-variable type inference failed */
            {
                TestEnvironment.ManualSubscriber newSubscriber = newSubscriber();
                newSubscriber.requestMore(Integer.MAX_VALUE);
                newSubscriber.requestMore(Integer.MAX_VALUE);
                newSubscriber.requestMore(2);
                expectNextElement(newSubscriber, sendNextTFromUpstream());
                expectNextElement(newSubscriber, sendNextTFromUpstream());
                sendCompletion();
                newSubscriber.expectCompletion(this.env.defaultTimeoutMillis());
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    @Test
    public void createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
        this.publisherVerification.createPublisher3MustProduceAStreamOfExactly3Elements();
    }

    @Test
    public void mustCallOnCompleteOnASubscriberAfterHavingProducedTheFinalStreamElementToIt() throws Throwable {
        this.publisherVerification.mustCallOnCompleteOnASubscriberAfterHavingProducedTheFinalStreamElementToIt();
    }

    @Test
    public void mustStartProducingWithTheOldestStillAvailableElementForASubscriber() {
        this.publisherVerification.mustStartProducingWithTheOldestStillAvailableElementForASubscriber();
    }

    @Test
    public void mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Exception {
        new IdentityProcessorVerification<T>.TestSetup(this.env, this.testBufferSize) { // from class: org.reactivestreams.tck.IdentityProcessorVerification.4
            /* JADX WARN: Multi-variable type inference failed */
            {
                ManualSubscriberWithErrorCollection manualSubscriberWithErrorCollection = new ManualSubscriberWithErrorCollection(this.env);
                this.env.subscribe(this.processor.getPublisher(), manualSubscriberWithErrorCollection);
                ManualSubscriberWithErrorCollection manualSubscriberWithErrorCollection2 = new ManualSubscriberWithErrorCollection(this.env);
                this.env.subscribe(this.processor.getPublisher(), manualSubscriberWithErrorCollection2);
                manualSubscriberWithErrorCollection.requestMore(1);
                expectRequestMore();
                expectNextElement(manualSubscriberWithErrorCollection, sendNextTFromUpstream());
                manualSubscriberWithErrorCollection.requestMore(1);
                RuntimeException runtimeException = new RuntimeException("Test exception");
                sendError(runtimeException);
                manualSubscriberWithErrorCollection.expectError(runtimeException);
                manualSubscriberWithErrorCollection2.expectError(runtimeException);
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    @Test
    public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() {
        this.publisherVerification.mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber();
    }

    public Subscriber<T> createSubscriber(final SubscriberVerification.SubscriberProbe<T> subscriberProbe) {
        Processor<T, T> createIdentityProcessor = createIdentityProcessor(this.testBufferSize);
        createIdentityProcessor.getPublisher().subscribe(new Subscriber<T>() { // from class: org.reactivestreams.tck.IdentityProcessorVerification.5
            public void onSubscribe(final Subscription subscription) {
                subscriberProbe.registerOnSubscribe(new SubscriberVerification.SubscriberPuppet() { // from class: org.reactivestreams.tck.IdentityProcessorVerification.5.1
                    @Override // org.reactivestreams.tck.SubscriberVerification.SubscriberPuppet
                    public void triggerShutdown() {
                        subscription.cancel();
                    }

                    @Override // org.reactivestreams.tck.SubscriberVerification.SubscriberPuppet
                    public void triggerRequestMore(int i) {
                        subscription.requestMore(i);
                    }

                    @Override // org.reactivestreams.tck.SubscriberVerification.SubscriberPuppet
                    public void triggerCancel() {
                        subscription.cancel();
                    }
                });
            }

            public void onNext(T t) {
                subscriberProbe.registerOnNext(t);
            }

            public void onComplete() {
                subscriberProbe.registerOnComplete();
            }

            public void onError(Throwable th) {
                subscriberProbe.registerOnError(th);
            }
        });
        return createIdentityProcessor.getSubscriber();
    }

    @Test
    public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasBeenCancelled() throws Exception {
        new IdentityProcessorVerification<T>.TestSetup(this.env, this.testBufferSize) { // from class: org.reactivestreams.tck.IdentityProcessorVerification.6
            {
                newSubscriber().cancel();
                expectCancelling();
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    @Test
    public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
        new IdentityProcessorVerification<T>.TestSetup(this.env, this.testBufferSize) { // from class: org.reactivestreams.tck.IdentityProcessorVerification.7
            {
                ManualSubscriberWithErrorCollection manualSubscriberWithErrorCollection = new ManualSubscriberWithErrorCollection(this.env);
                this.env.subscribe(this.processor.getPublisher(), manualSubscriberWithErrorCollection);
                RuntimeException runtimeException = new RuntimeException("Test exception");
                sendError(runtimeException);
                manualSubscriberWithErrorCollection.expectError(runtimeException);
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    @Test
    public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstreamSubscriberHasNotRequestedYet() throws Exception {
        new IdentityProcessorVerification<T>.TestSetup(this.env, this.testBufferSize) { // from class: org.reactivestreams.tck.IdentityProcessorVerification.8
            /* JADX WARN: Multi-variable type inference failed */
            {
                TestEnvironment.ManualSubscriber newSubscriber = newSubscriber();
                Object sendNextTFromUpstream = sendNextTFromUpstream();
                newSubscriber.expectNone(50L);
                Object sendNextTFromUpstream2 = sendNextTFromUpstream();
                newSubscriber.expectNone(50L);
                newSubscriber.requestMore(2);
                newSubscriber.expectNext(sendNextTFromUpstream);
                newSubscriber.expectNext(sendNextTFromUpstream2);
                sendCompletion();
                newSubscriber.expectCompletion(this.env.defaultTimeoutMillis());
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    @Test
    public void exerciseHappyPath() throws InterruptedException {
        this.subscriberVerification.exerciseHappyPath();
    }

    @Test
    public void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
        this.subscriberVerification.onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent();
    }

    @Test
    public void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() {
        this.subscriberVerification.onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent();
    }

    @Test
    public void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException {
        this.subscriberVerification.mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription();
    }

    @Test
    public void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription() throws InterruptedException {
        this.subscriberVerification.mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription();
    }

    @Test
    public void mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync() {
        this.subscriberVerification.mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync();
    }

    @Test
    public void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel() throws InterruptedException {
        this.subscriberVerification.mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel();
    }

    @Test
    public void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
        this.subscriberVerification.mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore();
    }

    @Test
    public void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
        this.subscriberVerification.mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore();
    }

    @Test
    public void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
        this.subscriberVerification.mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore();
    }

    @Test
    public void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
        this.subscriberVerification.mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore();
    }

    @Test
    public void mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() {
        this.subscriberVerification.mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents();
    }

    @Test
    public void publisherSubscribeWhenCompletedMustTriggerOnCompleteAndNotOnSubscribe() throws Throwable {
        this.publisherVerification.publisherSubscribeWhenCompletedMustTriggerOnCompleteAndNotOnSubscribe();
    }

    @Test
    public void publisherSubscribeWhenInErrorStateMustTriggerOnErrorAndNotOnSubscribe() throws Throwable {
        this.publisherVerification.publisherSubscribeWhenInErrorStateMustTriggerOnErrorAndNotOnSubscribe();
    }

    @Test
    public void publisherSubscribeWhenInShutDownStateMustTriggerOnErrorAndNotOnSubscribe() throws Throwable {
        this.publisherVerification.publisherSubscribeWhenInShutDownStateMustTriggerOnErrorAndNotOnSubscribe();
    }

    @Test
    public void publisherSubscribeWhenActiveMustCallOnSubscribeFirst() throws Throwable {
        this.publisherVerification.publisherSubscribeWhenActiveMustCallOnSubscribeFirst();
    }

    @Test
    public void publisherSubscribeWhenActiveMustRejectDoubleSubscription() throws Throwable {
        this.publisherVerification.publisherSubscribeWhenActiveMustRejectDoubleSubscription();
    }

    @Test
    public void subscriptionRequestMoreWhenCancelledMustIgnoreTheCall() throws Throwable {
        this.publisherVerification.subscriptionRequestMoreWhenCancelledMustIgnoreTheCall();
    }

    @Test
    public void subscriptionRequestMoreMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
        this.publisherVerification.subscriptionRequestMoreMustResultInTheCorrectNumberOfProducedElements();
    }

    @Test
    public void subscriptionRequestMoreMustThrowIfArgumentIsNonPositive() throws Throwable {
        this.publisherVerification.subscriptionRequestMoreMustThrowIfArgumentIsNonPositive();
    }

    @Test
    public void subscriptionCancelWhenCancelledMustIgnoreCall() throws Throwable {
        this.publisherVerification.subscriptionCancelWhenCancelledMustIgnoreCall();
    }

    @Test
    public void onSubscriptionCancelThePublisherMustEventuallyCeaseToCallAnyMethodsOnTheSubscriber() throws Throwable {
        this.publisherVerification.onSubscriptionCancelThePublisherMustEventuallyCeaseToCallAnyMethodsOnTheSubscriber();
    }

    @Test
    public void onSubscriptionCancelThePublisherMustEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
        this.publisherVerification.onSubscriptionCancelThePublisherMustEventuallyDropAllReferencesToTheSubscriber();
    }

    @Test
    public void mustNotCallOnNextAfterHavingIssuedAnOnCompleteOrOnErrorCallOnASubscriber() {
        this.publisherVerification.mustNotCallOnNextAfterHavingIssuedAnOnCompleteOrOnErrorCallOnASubscriber();
    }

    @Test
    public void mustProduceTheSameElementsInTheSameSequenceForAllItsSubscribers() throws Throwable {
        this.publisherVerification.mustProduceTheSameElementsInTheSameSequenceForAllItsSubscribers();
    }

    @Test
    public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Exception {
        new IdentityProcessorVerification<T>.TestSetup(this.env, this.testBufferSize) { // from class: org.reactivestreams.tck.IdentityProcessorVerification.9
            /* JADX WARN: Multi-variable type inference failed */
            {
                TestEnvironment.ManualSubscriber newSubscriber = newSubscriber();
                newSubscriber.requestMore(20);
                int expectRequestMore = expectRequestMore();
                expectNextElement(newSubscriber, sendNextTFromUpstream());
                expectRequestMore = expectRequestMore == 1 ? expectRequestMore + expectRequestMore() : expectRequestMore;
                expectNextElement(newSubscriber, sendNextTFromUpstream());
                expectRequestMore = expectRequestMore == 2 ? expectRequestMore + expectRequestMore() : expectRequestMore;
                TestEnvironment.ManualSubscriber newSubscriber2 = newSubscriber();
                Object sendNextTFromUpstream = sendNextTFromUpstream();
                expectNextElement(newSubscriber, sendNextTFromUpstream);
                newSubscriber2.expectNone();
                newSubscriber2.requestMore(1);
                expectNextElement(newSubscriber2, sendNextTFromUpstream);
                if (expectRequestMore == 3) {
                    expectRequestMore();
                }
                sendCompletion();
                newSubscriber.expectCompletion(this.env.defaultTimeoutMillis());
                newSubscriber2.expectCompletion(this.env.defaultTimeoutMillis());
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    @Test
    public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws InterruptedException {
        new IdentityProcessorVerification<T>.TestSetup(this.env, this.testBufferSize) { // from class: org.reactivestreams.tck.IdentityProcessorVerification.10
            /* JADX WARN: Multi-variable type inference failed */
            {
                TestEnvironment.ManualSubscriber newSubscriber = newSubscriber();
                TestEnvironment.ManualSubscriber newSubscriber2 = newSubscriber();
                newSubscriber.requestMore(this.testBufferSize + 1);
                int i = 0;
                int i2 = 0;
                Object[] objArr = new Object[this.testBufferSize];
                while (i2 < this.testBufferSize) {
                    if (i == 0) {
                        i = expectRequestMore();
                    }
                    objArr[i2] = nextT();
                    sendNext(objArr[i2]);
                    i2++;
                    i--;
                }
                expectNoRequestMore();
                newSubscriber2.cancel();
                expectRequestMore();
                for (Object obj : objArr) {
                    expectNextElement(newSubscriber, obj);
                }
                sendCompletion();
                newSubscriber.expectCompletion(this.env.defaultTimeoutMillis());
                this.env.verifyNoAsyncErrors();
            }
        };
    }
}
