package org.reactivestreams.tck;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.support.Function;
import org.reactivestreams.tck.support.Optional;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/reactivestreams/tck/PublisherVerification.class */
public abstract class PublisherVerification<T> {
    private final TestEnvironment env;
    private final long publisherReferenceGCTimeoutMillis;

    /* loaded from: input_file:org/reactivestreams/tck/PublisherVerification$PublisherTestRun.class */
    public interface PublisherTestRun<T> {
        void run(Publisher<T> publisher) throws Throwable;
    }

    public PublisherVerification(TestEnvironment testEnvironment, long j) {
        this.env = testEnvironment;
        this.publisherReferenceGCTimeoutMillis = j;
    }

    public abstract Publisher<T> createPublisher(long j);

    public abstract Publisher<T> createErrorStatePublisher();

    public long maxElementsFromPublisher() {
        return Long.MAX_VALUE;
    }

    public boolean skipStochasticTests() {
        return false;
    }

    public long boundedDepthOfOnNextAndRequestRecursion() {
        return 1L;
    }

    @BeforeMethod
    public void setUp() throws Exception {
        this.env.clearAsyncErrors();
    }

    @Test
    public void createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable {
        activePublisherTest(1L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.1
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws InterruptedException {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                Assert.assertTrue(requestNextElementOrEndOfStream(publisher, newManualSubscriber).isDefined(), String.format("Publisher %s produced no elements", publisher));
                newManualSubscriber.requestEndOfStream();
            }

            Optional<T> requestNextElementOrEndOfStream(Publisher<T> publisher, TestEnvironment.ManualSubscriber<T> manualSubscriber) throws InterruptedException {
                return manualSubscriber.requestNextElementOrEndOfStream("Timeout while waiting for next element from Publisher" + publisher);
            }
        });
    }

    @Test
    public void createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable {
        activePublisherTest(3L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.2
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws InterruptedException {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                Assert.assertTrue(requestNextElementOrEndOfStream(publisher, newManualSubscriber).isDefined(), String.format("Publisher %s produced no elements", publisher));
                Assert.assertTrue(requestNextElementOrEndOfStream(publisher, newManualSubscriber).isDefined(), String.format("Publisher %s produced only 1 element", publisher));
                Assert.assertTrue(requestNextElementOrEndOfStream(publisher, newManualSubscriber).isDefined(), String.format("Publisher %s produced only 2 elements", publisher));
                newManualSubscriber.requestEndOfStream();
            }

            Optional<T> requestNextElementOrEndOfStream(Publisher<T> publisher, TestEnvironment.ManualSubscriber<T> manualSubscriber) throws InterruptedException {
                return manualSubscriber.requestNextElementOrEndOfStream("Timeout while waiting for next element from Publisher" + publisher);
            }
        });
    }

    @Test
    public void validate_maxElementsFromPublisher() throws Exception {
        Assert.assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0");
    }

    @Test
    public void validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception {
        Assert.assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1");
    }

    @Test
    public void spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable {
        activePublisherTest(5L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.3
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws InterruptedException {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.expectNone("Publisher " + publisher + " produced value before the first `request`: ");
                newManualSubscriber.request(1L);
                newManualSubscriber.nextElement("Publisher " + publisher + " produced no element after first `request`");
                newManualSubscriber.expectNone("Publisher " + publisher + " produced unrequested: ");
                newManualSubscriber.request(1L);
                newManualSubscriber.request(2L);
                newManualSubscriber.nextElements(3L, PublisherVerification.this.env.defaultTimeoutMillis(), "Publisher " + publisher + " produced less than 3 elements after two respective `request` calls");
                newManualSubscriber.expectNone("Publisher " + publisher + "produced unrequested ");
            }
        });
    }

    @Test
    public void spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable {
        activePublisherTest(3L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.4
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.request(10L);
                newManualSubscriber.nextElements(3L);
                newManualSubscriber.expectCompletion();
            }
        });
    }

    @Test
    public void spec103_mustSignalOnMethodsSequentially() throws Throwable {
        stochasticTest(100, new Function<Integer, Void>() { // from class: org.reactivestreams.tck.PublisherVerification.5
            @Override // org.reactivestreams.tck.support.Function
            public Void apply(Integer num) throws Throwable {
                PublisherVerification.this.activePublisherTest(10L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.5.1
                    @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
                    public void run(Publisher<T> publisher) throws Throwable {
                        final TestEnvironment.Latch latch = new TestEnvironment.Latch(PublisherVerification.this.env);
                        final TestEnvironment.Latch latch2 = new TestEnvironment.Latch(PublisherVerification.this.env);
                        publisher.subscribe(new Subscriber<T>() { // from class: org.reactivestreams.tck.PublisherVerification.5.1.1
                            private Subscription subs;
                            private long gotElements = 0;
                            private String state = "init";

                            public void onSubscribe(Subscription subscription) {
                                latch.assertOpen("Expected latch to be open during onSubscribe call, state seems to be: " + this.state);
                                latch.close();
                                this.state = "onSubscribe";
                                this.subs = subscription;
                                this.subs.request(1L);
                                latch.reOpen();
                            }

                            public void onNext(T t) {
                                latch.assertOpen("Expected latch to be open during onNext call, state seems to be: " + this.state);
                                latch.close();
                                this.state = "onNext-" + t;
                                this.gotElements++;
                                if (this.gotElements <= 10) {
                                    this.subs.request(1L);
                                }
                                latch.reOpen();
                            }

                            public void onError(Throwable th) {
                                latch.assertOpen("Expected latch to be open during onError call, state seems to be: " + this.state);
                                latch.close();
                                this.state = "onError";
                                latch.reOpen();
                            }

                            public void onComplete() {
                                latch.assertOpen("Expected latch to be open during onComplete call, state seems to be: " + this.state);
                                latch.close();
                                this.state = "onComplete";
                                latch.reOpen();
                                latch2.close();
                            }
                        });
                        latch2.expectClose(10 * PublisherVerification.this.env.defaultTimeoutMillis(), "Expected 10 elements to be drained");
                    }
                });
                return null;
            }
        });
    }

    @Test
    public void spec104_mustSignalOnErrorWhenFails() throws Throwable {
        errorPublisherTest(new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.6
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(final Publisher<T> publisher) throws InterruptedException {
                final TestEnvironment.Latch latch = new TestEnvironment.Latch(PublisherVerification.this.env);
                publisher.subscribe(new TestEnvironment.TestSubscriber<T>(PublisherVerification.this.env) { // from class: org.reactivestreams.tck.PublisherVerification.6.1
                    @Override // org.reactivestreams.tck.TestEnvironment.TestSubscriber
                    public void onError(Throwable th) {
                        latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", publisher));
                        latch.close();
                    }
                });
                latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", publisher));
                Thread.sleep(PublisherVerification.this.env.defaultTimeoutMillis());
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable {
        activePublisherTest(3L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.7
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.requestNextElement();
                newManualSubscriber.requestNextElement();
                newManualSubscriber.requestNextElement();
                newManualSubscriber.requestEndOfStream();
                newManualSubscriber.expectNone();
            }
        });
    }

    @Test
    public void spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable {
        notVerified();
    }

    @Test
    public void spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable {
        activePublisherTest(1L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.8
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.request(10L);
                newManualSubscriber.nextElement();
                newManualSubscriber.expectCompletion();
                newManualSubscriber.request(10L);
                newManualSubscriber.expectNone();
            }
        });
    }

    @Test
    public void spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable {
        notVerified();
    }

    @Test
    public void spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable {
        notVerified();
    }

    @Test
    public void spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
        notVerified();
    }

    @Test
    public void spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
        optionalActivePublisherTest(3L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.9
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                publisher.subscribe(newManualSubscriber);
                newManualSubscriber.expectErrorWithMessage(IllegalStateException.class, "1.10");
            }
        });
    }

    @Test
    public void spec111_maySupportMultiSubscribe() throws Throwable {
        optionalActivePublisherTest(1L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.10
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                PublisherVerification.this.env.newManualSubscriber(publisher);
                PublisherVerification.this.env.newManualSubscriber(publisher);
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
        errorPublisherTest(new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.11
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                final TestEnvironment.Latch latch = new TestEnvironment.Latch(PublisherVerification.this.env);
                publisher.subscribe(new TestEnvironment.ManualSubscriberWithSubscriptionSupport<T>(PublisherVerification.this.env) { // from class: org.reactivestreams.tck.PublisherVerification.11.1
                    @Override // org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport, org.reactivestreams.tck.TestEnvironment.TestSubscriber
                    public void onError(Throwable th) {
                        latch.assertOpen("Only one onError call expected");
                        latch.close();
                    }

                    @Override // org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport, org.reactivestreams.tck.TestEnvironment.TestSubscriber
                    public void onSubscribe(Subscription subscription) {
                        this.env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber");
                    }
                });
                latch.assertClosed("Should have received onError");
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
        optionalActivePublisherTest(5L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.12
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws InterruptedException {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                TestEnvironment.ManualSubscriber<T> newManualSubscriber2 = PublisherVerification.this.env.newManualSubscriber(publisher);
                TestEnvironment.ManualSubscriber<T> newManualSubscriber3 = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.request(1L);
                T nextElement = newManualSubscriber.nextElement("Publisher " + publisher + " did not produce the requested 1 element on 1st subscriber");
                newManualSubscriber2.request(2L);
                List<T> nextElements = newManualSubscriber2.nextElements(2L, "Publisher " + publisher + " did not produce the requested 2 elements on 2nd subscriber");
                newManualSubscriber.request(1L);
                T nextElement2 = newManualSubscriber.nextElement("Publisher " + publisher + " did not produce the requested 1 element on 1st subscriber");
                newManualSubscriber3.request(3L);
                List<T> nextElements2 = newManualSubscriber3.nextElements(3L, "Publisher " + publisher + " did not produce the requested 3 elements on 3rd subscriber");
                newManualSubscriber3.request(1L);
                T nextElement3 = newManualSubscriber3.nextElement("Publisher " + publisher + " did not produce the requested 1 element on 3rd subscriber");
                newManualSubscriber3.request(1L);
                T nextElement4 = newManualSubscriber3.nextElement("Publisher " + publisher + " did not produce the requested 1 element on 3rd subscriber");
                newManualSubscriber3.requestEndOfStream("Publisher " + publisher + " did not complete the stream as expected on 3rd subscriber");
                newManualSubscriber2.request(3L);
                List<T> nextElements3 = newManualSubscriber2.nextElements(3L, "Publisher " + publisher + " did not produce the requested 3 elements on 2nd subscriber");
                newManualSubscriber2.requestEndOfStream("Publisher " + publisher + " did not complete the stream as expected on 2nd subscriber");
                newManualSubscriber.request(2L);
                List<T> nextElements4 = newManualSubscriber.nextElements(2L, "Publisher " + publisher + " did not produce the requested 2 elements on 1st subscriber");
                newManualSubscriber.request(1L);
                T nextElement5 = newManualSubscriber.nextElement("Publisher " + publisher + " did not produce the requested 1 element on 1st subscriber");
                newManualSubscriber.requestEndOfStream("Publisher " + publisher + " did not complete the stream as expected on 1st subscriber");
                ArrayList arrayList = new ArrayList(Arrays.asList(nextElement, nextElement2));
                arrayList.addAll(nextElements4);
                arrayList.addAll(Collections.singleton(nextElement5));
                ArrayList arrayList2 = new ArrayList(nextElements);
                arrayList2.addAll(nextElements3);
                ArrayList arrayList3 = new ArrayList(nextElements2);
                arrayList3.add(nextElement3);
                arrayList3.add(nextElement4);
                Assert.assertEquals(arrayList, arrayList2, "Publisher " + publisher + " did not produce the same element sequence for subscribers 1 and 2");
                Assert.assertEquals(arrayList, arrayList3, "Publisher " + publisher + " did not produce the same element sequence for subscribers 1 and 3");
            }
        });
    }

    @Test
    public void spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
        optionalActivePublisherTest(3L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.13
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                TestEnvironment.ManualSubscriber<T> newManualSubscriber2 = PublisherVerification.this.env.newManualSubscriber(publisher);
                TestEnvironment.ManualSubscriber<T> newManualSubscriber3 = PublisherVerification.this.env.newManualSubscriber(publisher);
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                newManualSubscriber.request(4L);
                newManualSubscriber2.request(4L);
                newManualSubscriber3.request(4L);
                arrayList.addAll(newManualSubscriber.nextElements(3L));
                arrayList2.addAll(newManualSubscriber2.nextElements(3L));
                arrayList3.addAll(newManualSubscriber3.nextElements(3L));
                newManualSubscriber.expectCompletion();
                newManualSubscriber2.expectCompletion();
                newManualSubscriber3.expectCompletion();
                Assert.assertEquals(arrayList, arrayList2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers", new Object[0]));
                Assert.assertEquals(arrayList2, arrayList3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers", new Object[0]));
            }
        });
    }

    @Test
    public void spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable {
        activePublisherTest(6L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.14
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                PublisherVerification.this.env.subscribe(publisher, new TestEnvironment.ManualSubscriber<T>(PublisherVerification.this.env) { // from class: org.reactivestreams.tck.PublisherVerification.14.1
                    @Override // org.reactivestreams.tck.TestEnvironment.TestSubscriber
                    public void onSubscribe(Subscription subscription) {
                        this.subscription.completeImmediatly(subscription);
                        subscription.request(1L);
                        subscription.request(1L);
                        subscription.request(1L);
                    }

                    @Override // org.reactivestreams.tck.TestEnvironment.ManualSubscriber, org.reactivestreams.tck.TestEnvironment.TestSubscriber
                    public void onNext(T t) {
                        this.subscription.value().request(1L);
                    }
                });
                PublisherVerification.this.env.verifyNoAsyncErrors(PublisherVerification.this.env.defaultTimeoutMillis());
            }
        });
    }

    @Test
    public void spec303_mustNotAllowUnboundedRecursion() throws Throwable {
        activePublisherTest(boundedDepthOfOnNextAndRequestRecursion() + 1, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.15
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                final ThreadLocal<Long> threadLocal = new ThreadLocal<Long>() { // from class: org.reactivestreams.tck.PublisherVerification.15.1
                    /* JADX INFO: Access modifiers changed from: protected */
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.lang.ThreadLocal
                    public Long initialValue() {
                        return 0L;
                    }
                };
                TestEnvironment.ManualSubscriberWithSubscriptionSupport<T> manualSubscriberWithSubscriptionSupport = new TestEnvironment.ManualSubscriberWithSubscriptionSupport<T>(PublisherVerification.this.env) { // from class: org.reactivestreams.tck.PublisherVerification.15.2
                    @Override // org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport, org.reactivestreams.tck.TestEnvironment.ManualSubscriber, org.reactivestreams.tck.TestEnvironment.TestSubscriber
                    public void onNext(T t) {
                        threadLocal.set(Long.valueOf(((Long) threadLocal.get()).longValue() + 1));
                        super.onNext(t);
                        Long l = (Long) threadLocal.get();
                        if (l.longValue() > PublisherVerification.this.boundedDepthOfOnNextAndRequestRecursion()) {
                            this.env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d", l, Thread.currentThread(), Long.valueOf(PublisherVerification.this.boundedDepthOfOnNextAndRequestRecursion())));
                        }
                        this.subscription.value().request(1L);
                        threadLocal.set(Long.valueOf(((Long) threadLocal.get()).longValue() - 1));
                    }
                };
                PublisherVerification.this.env.subscribe(publisher, manualSubscriberWithSubscriptionSupport);
                manualSubscriberWithSubscriptionSupport.request(1L);
                manualSubscriberWithSubscriptionSupport.nextElementOrEndOfStream();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec304_requestShouldNotPerformHeavyComputations() throws Exception {
        notVerified();
    }

    @Test
    public void spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation() throws Exception {
        notVerified();
    }

    @Test
    public void spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable {
        activePublisherTest(3L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.16
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriberWithSubscriptionSupport<T> manualSubscriberWithSubscriptionSupport = new TestEnvironment.ManualSubscriberWithSubscriptionSupport<T>(PublisherVerification.this.env) { // from class: org.reactivestreams.tck.PublisherVerification.16.1
                    @Override // org.reactivestreams.tck.TestEnvironment.TestSubscriber
                    public void cancel() {
                        if (this.subscription.isCompleted()) {
                            this.subscription.value().cancel();
                        } else {
                            this.env.flop("Cannot cancel a subscription before having received it");
                        }
                    }
                };
                PublisherVerification.this.env.subscribe(publisher, manualSubscriberWithSubscriptionSupport);
                manualSubscriberWithSubscriptionSupport.cancel();
                manualSubscriberWithSubscriptionSupport.request(1L);
                manualSubscriberWithSubscriptionSupport.request(1L);
                manualSubscriberWithSubscriptionSupport.request(1L);
                manualSubscriberWithSubscriptionSupport.expectNone();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable {
        activePublisherTest(1L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.17
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                Subscription value = newManualSubscriber.subscription.value();
                value.cancel();
                value.cancel();
                value.cancel();
                newManualSubscriber.expectNone();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable {
        activePublisherTest(10L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.18
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.request(0L);
                newManualSubscriber.expectErrorWithMessage(IllegalArgumentException.class, "3.9");
            }
        });
    }

    @Test
    public void spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable {
        activePublisherTest(10L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.19
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.request(-new Random().nextInt(Integer.MAX_VALUE));
                newManualSubscriber.expectErrorWithMessage(IllegalArgumentException.class, "3.9");
            }
        });
    }

    @Test
    public void spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
        activePublisherTest(20L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.20
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                boolean z;
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.request(10);
                newManualSubscriber.request(5);
                int i = 10 + 5;
                newManualSubscriber.cancel();
                newManualSubscriber.nextElement();
                int i2 = 1;
                do {
                    newManualSubscriber.expectNone();
                    if (PublisherVerification.this.env.dropAsyncError() == null) {
                        z = false;
                    } else {
                        i2++;
                        z = true;
                    }
                    if (!z) {
                        break;
                    }
                } while (i2 < i);
                Assert.assertTrue(i2 <= i, String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", Integer.valueOf(i2), Integer.valueOf(i)));
            }
        });
        this.env.verifyNoAsyncErrors();
    }

    @Test
    public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable {
        final ReferenceQueue referenceQueue = new ReferenceQueue();
        final Function<Publisher<T>, WeakReference<TestEnvironment.ManualSubscriber<T>>> function = new Function<Publisher<T>, WeakReference<TestEnvironment.ManualSubscriber<T>>>() { // from class: org.reactivestreams.tck.PublisherVerification.21
            @Override // org.reactivestreams.tck.support.Function
            public WeakReference<TestEnvironment.ManualSubscriber<T>> apply(Publisher<T> publisher) throws Exception {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                WeakReference<TestEnvironment.ManualSubscriber<T>> weakReference = new WeakReference<>(newManualSubscriber, referenceQueue);
                newManualSubscriber.request(1L);
                newManualSubscriber.nextElement();
                newManualSubscriber.cancel();
                return weakReference;
            }
        };
        activePublisherTest(3L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.22
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                WeakReference weakReference = (WeakReference) function.apply(publisher);
                Thread.sleep(PublisherVerification.this.publisherReferenceGCTimeoutMillis);
                System.gc();
                if (weakReference.equals(referenceQueue.remove(100L))) {
                    return;
                }
                PublisherVerification.this.env.flop("Publisher " + publisher + " did not drop reference to test subscriber after subscription cancellation");
            }
        });
    }

    @Test
    public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable {
        activePublisherTest(3L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.23
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.request(Long.MAX_VALUE);
                newManualSubscriber.nextElements(3L);
                newManualSubscriber.expectCompletion();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable {
        activePublisherTest(3L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.24
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newManualSubscriber = PublisherVerification.this.env.newManualSubscriber(publisher);
                newManualSubscriber.request(4611686018427387903L);
                newManualSubscriber.request(4611686018427387903L);
                newManualSubscriber.request(1L);
                newManualSubscriber.nextElements(3L);
                newManualSubscriber.expectCompletion();
                PublisherVerification.this.env.verifyNoAsyncErrors();
            }
        });
    }

    @Test
    public void spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
        final long defaultTimeoutMillis = this.env.defaultTimeoutMillis() / 10;
        activePublisherTest(2147483647L, new PublisherTestRun<T>() { // from class: org.reactivestreams.tck.PublisherVerification.25
            @Override // org.reactivestreams.tck.PublisherVerification.PublisherTestRun
            public void run(Publisher<T> publisher) throws Throwable {
                TestEnvironment.ManualSubscriber<T> newBlackholeSubscriber = PublisherVerification.this.env.newBlackholeSubscriber(publisher);
                newBlackholeSubscriber.request(9223372036854775806L);
                long j = 0;
                boolean z = false;
                while (!z && j < 10) {
                    newBlackholeSubscriber.request(9223372036854775806L);
                    Thread.sleep(defaultTimeoutMillis);
                    Throwable dropAsyncError = PublisherVerification.this.env.dropAsyncError();
                    if (dropAsyncError != null) {
                        PublisherVerification.this.env.assertErrorWithMessage(dropAsyncError, IllegalStateException.class, "3.17");
                        z = true;
                    }
                    j++;
                }
                PublisherVerification.this.env.debug(String.format("Signalled overflow after %d-th spin (of max: %d), with %dms delays: %s (`true` is expected)", Long.valueOf(j + 1), 10L, Long.valueOf(defaultTimeoutMillis), Boolean.valueOf(z)));
                Assert.assertTrue(z, String.format("Expected overflow to be signalled after %d spins (max: %d), with delays: %d", Long.valueOf(j + 1), 10L, Long.valueOf(defaultTimeoutMillis)));
                PublisherVerification.this.env.verifyNoAsyncErrors(PublisherVerification.this.env.defaultTimeoutMillis());
            }
        });
    }

    public void activePublisherTest(long j, PublisherTestRun<T> publisherTestRun) throws Throwable {
        if (j > maxElementsFromPublisher()) {
            throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", Long.valueOf(j), Long.valueOf(maxElementsFromPublisher())));
        }
        publisherTestRun.run(createPublisher(j));
        this.env.verifyNoAsyncErrors();
    }

    public void optionalActivePublisherTest(long j, PublisherTestRun<T> publisherTestRun) throws Throwable {
        if (j > maxElementsFromPublisher()) {
            throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", Long.valueOf(j), Long.valueOf(maxElementsFromPublisher())));
        }
        try {
            potentiallyPendingTest(createPublisher(j), publisherTestRun);
        } catch (AssertionError e) {
            notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement.");
        } catch (Exception e2) {
            notVerified("Skipped because tested publisher does NOT implement this OPTIONAL requirement.");
        }
    }

    public void errorPublisherTest(PublisherTestRun<T> publisherTestRun) throws Throwable {
        potentiallyPendingTest(createErrorStatePublisher(), publisherTestRun);
    }

    public void potentiallyPendingTest(Publisher<T> publisher, PublisherTestRun<T> publisherTestRun) throws Throwable {
        if (publisher == null) {
            throw new SkipException("Skipping, because no Publisher was provided for this type of test");
        }
        publisherTestRun.run(publisher);
    }

    public void stochasticTest(int i, Function<Integer, Void> function) throws Throwable {
        if (skipStochasticTests()) {
            notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!");
        }
        for (int i2 = 0; i2 < i; i2++) {
            function.apply(Integer.valueOf(i2));
        }
    }

    public void notVerified() {
        throw new SkipException("Not verified by this TCK.");
    }

    public void notVerified(String str) {
        throw new SkipException(str);
    }
}
