package org.reactivestreams.tck;

import org.reactivestreams.spi.Publisher;
import org.reactivestreams.spi.Subscriber;
import org.reactivestreams.spi.Subscription;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.Test;

/* loaded from: input_file:org/reactivestreams/tck/SubscriberVerification.class */
public abstract class SubscriberVerification<T> {
    private final TestEnvironment env;

    /* loaded from: input_file:org/reactivestreams/tck/SubscriberVerification$SubscriberProbe.class */
    interface SubscriberProbe<T> {
        void registerOnSubscribe(SubscriberPuppet subscriberPuppet);

        void registerOnNext(T t);

        void registerOnComplete();

        void registerOnError(Throwable th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reactivestreams/tck/SubscriberVerification$SubscriberPuppet.class */
    public interface SubscriberPuppet {
        void triggerShutdown();

        void triggerRequestMore(int i);

        void triggerCancel();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reactivestreams/tck/SubscriberVerification$TestSetup.class */
    public class TestSetup extends TestEnvironment.ManualPublisher<T> {
        TestEnvironment.ManualSubscriber<T> tees;
        SubscriberVerification<T>.TestSetup.Probe probe;
        T lastT;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/reactivestreams/tck/SubscriberVerification$TestSetup$Probe.class */
        public class Probe implements SubscriberProbe<T> {
            TestEnvironment.Promise<SubscriberPuppet> puppet;
            TestEnvironment.Receptacle<T> elements;
            TestEnvironment.Latch completed;
            TestEnvironment.Promise<Throwable> error;

            Probe() {
                this.puppet = new TestEnvironment.Promise<>(TestSetup.this.env);
                this.elements = new TestEnvironment.Receptacle<>(TestSetup.this.env);
                this.completed = new TestEnvironment.Latch(TestSetup.this.env);
                this.error = new TestEnvironment.Promise<>(TestSetup.this.env);
            }

            @Override // org.reactivestreams.tck.SubscriberVerification.SubscriberProbe
            public void registerOnSubscribe(SubscriberPuppet subscriberPuppet) {
                if (this.puppet.isCompleted()) {
                    TestSetup.this.env.flop(String.format("Subscriber %s illegally accepted a second Subscription", TestSetup.this.sub()));
                } else {
                    this.puppet.complete(subscriberPuppet);
                }
            }

            @Override // org.reactivestreams.tck.SubscriberVerification.SubscriberProbe
            public void registerOnNext(T t) {
                this.elements.add(t);
            }

            @Override // org.reactivestreams.tck.SubscriberVerification.SubscriberProbe
            public void registerOnComplete() {
                this.completed.close();
            }

            @Override // org.reactivestreams.tck.SubscriberVerification.SubscriberProbe
            public void registerOnError(Throwable th) {
                this.error.complete(th);
            }

            void expectNext(T t) throws InterruptedException {
                expectNext(t, TestSetup.this.env.defaultTimeoutMillis());
            }

            void expectNext(T t, long j) throws InterruptedException {
                T next = this.elements.next(j, String.format("Subscriber %s did not call `registerOnNext(%s)`", TestSetup.this.sub(), t));
                if (next.equals(t)) {
                    return;
                }
                TestSetup.this.env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", TestSetup.this.sub(), next, t));
            }

            void expectCompletion() throws InterruptedException {
                expectCompletion(TestSetup.this.env.defaultTimeoutMillis());
            }

            void expectCompletion(long j) throws InterruptedException {
                this.completed.expectClose(j, String.format("Subscriber %s did not call `registerOnComplete()`", TestSetup.this.sub()));
            }

            void expectError(Throwable th) throws InterruptedException {
                expectError(th, TestSetup.this.env.defaultTimeoutMillis());
            }

            void expectError(Throwable th, long j) throws InterruptedException {
                this.error.expectCompletion(j, String.format("Subscriber %s did not call `registerOnError(%s)`", TestSetup.this.sub(), th));
                if (this.error.value() != th) {
                    TestSetup.this.env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", TestSetup.this.sub(), this.error.value(), th));
                }
            }

            public void verifyNoAsyncErrors() {
                TestSetup.this.env.verifyNoAsyncErrors();
            }
        }

        public TestSetup(TestEnvironment testEnvironment) throws InterruptedException {
            super(testEnvironment);
            this.lastT = null;
            this.tees = testEnvironment.newManualSubscriber(SubscriberVerification.this.createHelperPublisher(0));
            this.probe = new Probe();
            subscribe(SubscriberVerification.this.createSubscriber(this.probe));
            this.probe.puppet.expectCompletion(testEnvironment.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub()));
        }

        Subscriber<T> sub() {
            return this.subscriber.get();
        }

        SubscriberPuppet puppet() {
            return this.probe.puppet.value();
        }

        /* JADX WARN: Multi-variable type inference failed */
        void sendNextTFromUpstream() throws InterruptedException {
            sendNext(nextT());
        }

        T nextT() throws InterruptedException {
            this.lastT = this.tees.requestNextElement();
            return this.lastT;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SubscriberVerification(TestEnvironment testEnvironment) {
        this.env = testEnvironment;
    }

    abstract Subscriber<T> createSubscriber(SubscriberProbe<T> subscriberProbe);

    abstract Publisher<T> createHelperPublisher(int i);

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void exerciseHappyPath() throws InterruptedException {
        new SubscriberVerification<T>.TestSetup(this.env) { // from class: org.reactivestreams.tck.SubscriberVerification.1
            {
                puppet().triggerRequestMore(1);
                puppet().triggerRequestMore(1);
                int expectRequestMore = expectRequestMore();
                sendNextTFromUpstream();
                this.probe.expectNext(this.lastT);
                puppet().triggerRequestMore(1);
                if (expectRequestMore == 1) {
                    expectRequestMore();
                }
                sendNextTFromUpstream();
                this.probe.expectNext(this.lastT);
                puppet().triggerCancel();
                expectCancelling();
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException {
        new SubscriberVerification<T>.TestSetup(this.env) { // from class: org.reactivestreams.tck.SubscriberVerification.2
            {
                sub().onSubscribe(new Subscription() { // from class: org.reactivestreams.tck.SubscriberVerification.2.1
                    public void requestMore(int i) {
                        AnonymousClass2.this.env.flop(String.format("Subscriber %s illegally called `subscription.requestMore(%s)`", sub(), Integer.valueOf(i)));
                    }

                    public void cancel() {
                        AnonymousClass2.this.env.flop(String.format("Subscriber %s illegally called `subscription.cancel()`", sub()));
                    }
                });
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription() throws InterruptedException {
        new SubscriberVerification<T>.TestSetup(this.env) { // from class: org.reactivestreams.tck.SubscriberVerification.3
            {
                puppet().triggerShutdown();
                expectCancelling();
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel() throws InterruptedException {
        new SubscriberVerification<T>.TestSetup(this.env) { // from class: org.reactivestreams.tck.SubscriberVerification.4
            {
                puppet().triggerRequestMore(1);
                puppet().triggerCancel();
                expectCancelling();
                sendNextTFromUpstream();
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
        new SubscriberVerification<T>.TestSetup(this.env) { // from class: org.reactivestreams.tck.SubscriberVerification.5
            {
                puppet().triggerRequestMore(1);
                sendCompletion();
                this.probe.expectCompletion();
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
        new SubscriberVerification<T>.TestSetup(this.env) { // from class: org.reactivestreams.tck.SubscriberVerification.6
            {
                sendCompletion();
                this.probe.expectCompletion();
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
        new SubscriberVerification<T>.TestSetup(this.env) { // from class: org.reactivestreams.tck.SubscriberVerification.7
            {
                puppet().triggerRequestMore(1);
                RuntimeException runtimeException = new RuntimeException("Test exception");
                sendError(runtimeException);
                this.probe.expectError(runtimeException);
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
        new SubscriberVerification<T>.TestSetup(this.env) { // from class: org.reactivestreams.tck.SubscriberVerification.8
            {
                RuntimeException runtimeException = new RuntimeException("Test exception");
                sendError(runtimeException);
                this.probe.expectError(runtimeException);
                this.env.verifyNoAsyncErrors();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() {
    }
}
