package org.reactivestreams.tck;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.spi.Publisher;
import org.reactivestreams.spi.Subscriber;
import org.reactivestreams.spi.Subscription;
import org.reactivestreams.tck.support.NonFatal;
import org.reactivestreams.tck.support.Optional;
import org.testng.Assert;

/* loaded from: input_file:org/reactivestreams/tck/TestEnvironment.class */
public class TestEnvironment {
    public static final int TEST_BUFFER_SIZE = 16;
    private final long defaultTimeoutMillis;
    private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reactivestreams/tck/TestEnvironment$Latch.class */
    public static class Latch {
        private final TestEnvironment env;
        private volatile CountDownLatch countDownLatch = new CountDownLatch(1);

        public Latch(TestEnvironment testEnvironment) {
            this.env = testEnvironment;
        }

        public void reOpen() {
            this.countDownLatch = new CountDownLatch(1);
        }

        public boolean isClosed() {
            return this.countDownLatch.getCount() == 0;
        }

        public void close() {
            this.countDownLatch.countDown();
        }

        public void assertClosed(String str) {
            if (isClosed()) {
                return;
            }
            this.env.flop(str);
        }

        public void assertOpen(String str) {
            if (isClosed()) {
                this.env.flop(str);
            }
        }

        public void expectClose(long j, String str) throws InterruptedException {
            this.countDownLatch.await(j, TimeUnit.MILLISECONDS);
            if (this.countDownLatch.getCount() > 0) {
                this.env.flop(String.format("%s within %d ms", str, Long.valueOf(j)));
            }
        }
    }

    /* loaded from: input_file:org/reactivestreams/tck/TestEnvironment$ManualPublisher.class */
    static class ManualPublisher<T> implements Publisher<T> {
        protected final TestEnvironment env;
        Optional<Subscriber<T>> subscriber = Optional.empty();
        Receptacle<Integer> requests;
        Latch cancelled;

        public ManualPublisher(TestEnvironment testEnvironment) {
            this.env = testEnvironment;
            this.requests = new Receptacle<>(testEnvironment);
            this.cancelled = new Latch(testEnvironment);
        }

        public void subscribe(Subscriber<T> subscriber) {
            if (!this.subscriber.isEmpty()) {
                this.env.flop("TestPublisher doesn't support more than one Subscriber");
            } else {
                this.subscriber = Optional.of(subscriber);
                subscriber.onSubscribe(new Subscription() { // from class: org.reactivestreams.tck.TestEnvironment.ManualPublisher.1
                    public void requestMore(int i) {
                        ManualPublisher.this.requests.add(Integer.valueOf(i));
                    }

                    public void cancel() {
                        ManualPublisher.this.cancelled.close();
                    }
                });
            }
        }

        public void sendNext(T t) {
            if (this.subscriber.isDefined()) {
                this.subscriber.get().onNext(t);
            } else {
                this.env.flop("Cannot sendNext before subscriber subscription");
            }
        }

        public void sendCompletion() {
            if (this.subscriber.isDefined()) {
                this.subscriber.get().onComplete();
            } else {
                this.env.flop("Cannot sendCompletion before subscriber subscription");
            }
        }

        public void sendError(Throwable th) {
            if (this.subscriber.isDefined()) {
                this.subscriber.get().onError(th);
            } else {
                this.env.flop("Cannot sendError before subscriber subscription");
            }
        }

        public int nextRequestMore() throws InterruptedException {
            return nextRequestMore(this.env.defaultTimeoutMillis());
        }

        public int nextRequestMore(long j) throws InterruptedException {
            return this.requests.next(j, "Did not receive expected `requestMore` call").intValue();
        }

        public int expectRequestMore() throws InterruptedException {
            return expectRequestMore(this.env.defaultTimeoutMillis());
        }

        public int expectRequestMore(long j) throws InterruptedException {
            int nextRequestMore = nextRequestMore(j);
            if (nextRequestMore > 0) {
                return nextRequestMore;
            }
            this.env.flop(String.format("Requests cannot be zero or negative but received requestMore(%s)", Integer.valueOf(nextRequestMore)));
            return 0;
        }

        public void expectExactRequestMore(int i) throws InterruptedException {
            expectExactRequestMore(i, this.env.defaultTimeoutMillis());
        }

        public void expectExactRequestMore(int i, long j) throws InterruptedException {
            int expectRequestMore = expectRequestMore(j);
            if (expectRequestMore != i) {
                this.env.flop(String.format("Received `requestMore(%d)` on upstream but expected `requestMore(%d)`", Integer.valueOf(expectRequestMore), Integer.valueOf(i)));
            }
        }

        public void expectNoRequestMore() throws InterruptedException {
            expectNoRequestMore(this.env.defaultTimeoutMillis());
        }

        public void expectNoRequestMore(long j) throws InterruptedException {
            this.requests.expectNone(j, "Received an unexpected call to: requestMore");
        }

        public void expectCancelling() throws InterruptedException {
            expectCancelling(this.env.defaultTimeoutMillis());
        }

        public void expectCancelling(long j) throws InterruptedException {
            this.cancelled.expectClose(j, "Did not receive expected cancelling of upstream subscription");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reactivestreams/tck/TestEnvironment$ManualSubscriber.class */
    public static class ManualSubscriber<T> extends TestSubscriber<T> {
        Receptacle<T> received;

        public ManualSubscriber(TestEnvironment testEnvironment) {
            super(testEnvironment);
            this.received = new Receptacle<>(this.env);
        }

        @Override // org.reactivestreams.tck.TestEnvironment.TestSubscriber
        public void onNext(T t) {
            this.received.add(t);
        }

        @Override // org.reactivestreams.tck.TestEnvironment.TestSubscriber
        public void onComplete() {
            this.received.complete();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void requestMore(int i) {
            this.subscription.value().requestMore(i);
        }

        public T requestNextElement() throws InterruptedException {
            return requestNextElement(this.env.defaultTimeoutMillis());
        }

        public T requestNextElement(long j) throws InterruptedException {
            return requestNextElement(j, "Did not receive expected element");
        }

        public T requestNextElement(String str) throws InterruptedException {
            return requestNextElement(this.env.defaultTimeoutMillis(), str);
        }

        public T requestNextElement(long j, String str) throws InterruptedException {
            requestMore(1);
            return nextElement(j, str);
        }

        public Optional<T> requestNextElementOrEndOfStream(String str) throws InterruptedException {
            return requestNextElementOrEndOfStream(this.env.defaultTimeoutMillis(), str);
        }

        public Optional<T> requestNextElementOrEndOfStream(long j) throws InterruptedException {
            return requestNextElementOrEndOfStream(j, "Did not receive expected stream completion");
        }

        public Optional<T> requestNextElementOrEndOfStream(long j, String str) throws InterruptedException {
            requestMore(1);
            return nextElementOrEndOfStream(j, str);
        }

        public void requestEndOfStream() throws InterruptedException {
            requestEndOfStream(this.env.defaultTimeoutMillis(), "Did not receive expected stream completion");
        }

        public void requestEndOfStream(long j) throws InterruptedException {
            requestEndOfStream(j, "Did not receive expected stream completion");
        }

        public void requestEndOfStream(String str) throws InterruptedException {
            requestEndOfStream(this.env.defaultTimeoutMillis(), str);
        }

        public void requestEndOfStream(long j, String str) throws InterruptedException {
            requestMore(1);
            expectCompletion(j, str);
        }

        public List<T> requestNextElements(int i, long j, String str) throws InterruptedException {
            requestMore(i);
            return nextElements(i, j, str);
        }

        public T nextElement() throws InterruptedException {
            return nextElement(this.env.defaultTimeoutMillis());
        }

        public T nextElement(long j) throws InterruptedException {
            return nextElement(j, "Did not receive expected element");
        }

        public T nextElement(String str) throws InterruptedException {
            return nextElement(this.env.defaultTimeoutMillis(), str);
        }

        public T nextElement(long j, String str) throws InterruptedException {
            return this.received.next(j, str);
        }

        public Optional<T> nextElementOrEndOfStream(long j) throws InterruptedException {
            return nextElementOrEndOfStream(j, "Did not receive expected stream completion");
        }

        public Optional<T> nextElementOrEndOfStream(long j, String str) throws InterruptedException {
            return this.received.nextOrEndOfStream(j, str);
        }

        public List<T> nextElements(int i) throws InterruptedException {
            return nextElements(i, this.env.defaultTimeoutMillis(), "Did not receive expected element or completion");
        }

        public List<T> nextElements(int i, String str) throws InterruptedException {
            return nextElements(i, this.env.defaultTimeoutMillis(), str);
        }

        public List<T> nextElements(int i, long j) throws InterruptedException {
            return nextElements(i, j, "Did not receive expected element or completion");
        }

        public List<T> nextElements(int i, long j, String str) throws InterruptedException {
            return this.received.nextN(i, j, str);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void expectNext(T t) throws InterruptedException {
            expectNext(t, this.env.defaultTimeoutMillis());
        }

        void expectNext(T t, long j) throws InterruptedException {
            T nextElement = nextElement(j, "Did not receive expected element on downstream");
            if (nextElement.equals(t)) {
                return;
            }
            this.env.flop(String.format("Expected element %s on downstream but received %s", t, nextElement));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void expectCompletion(long j) throws InterruptedException {
            expectCompletion(j, "Did not receive expected stream completion");
        }

        void expectCompletion(String str) throws InterruptedException {
            expectCompletion(this.env.defaultTimeoutMillis(), str);
        }

        void expectCompletion(long j, String str) throws InterruptedException {
            this.received.expectCompletion(j, str);
        }

        public void expectNone() throws InterruptedException {
            expectNone(this.env.defaultTimeoutMillis());
        }

        public void expectNone(String str) throws InterruptedException {
            this.received.expectNone(this.env.defaultTimeoutMillis(), str);
        }

        public void expectNone(long j) throws InterruptedException {
            this.received.expectNone(j, "Did not expect an element but got ");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reactivestreams/tck/TestEnvironment$ManualSubscriberWithSubscriptionSupport.class */
    public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> {
        public ManualSubscriberWithSubscriptionSupport(TestEnvironment testEnvironment) {
            super(testEnvironment);
        }

        @Override // org.reactivestreams.tck.TestEnvironment.ManualSubscriber, org.reactivestreams.tck.TestEnvironment.TestSubscriber
        public void onNext(T t) {
            if (this.subscription.isCompleted()) {
                super.onNext(t);
            } else {
                this.env.flop("Subscriber::onNext(" + t + ") called before Subscriber::onSubscribe");
            }
        }

        @Override // org.reactivestreams.tck.TestEnvironment.ManualSubscriber, org.reactivestreams.tck.TestEnvironment.TestSubscriber
        public void onComplete() {
            if (this.subscription.isCompleted()) {
                super.onComplete();
            } else {
                this.env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe");
            }
        }

        @Override // org.reactivestreams.tck.TestEnvironment.TestSubscriber
        public void onSubscribe(Subscription subscription) {
            if (this.subscription.isCompleted()) {
                this.env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber");
            } else {
                this.subscription.complete(subscription);
            }
        }

        @Override // org.reactivestreams.tck.TestEnvironment.TestSubscriber
        public void onError(Throwable th) {
            if (this.subscription.isCompleted()) {
                super.onError(th);
            } else {
                this.env.flop("Subscriber::onError(" + th + ") called before Subscriber::onSubscribe");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reactivestreams/tck/TestEnvironment$Promise.class */
    public static class Promise<T> {
        private final TestEnvironment env;
        private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<>(1);
        private volatile T _value = null;

        public Promise(TestEnvironment testEnvironment) {
            this.env = testEnvironment;
        }

        public T value() {
            if (isCompleted()) {
                return this._value;
            }
            this.env.flop("Cannot access promise value before completion");
            return null;
        }

        public boolean isCompleted() {
            return this._value != null;
        }

        public void complete(T t) {
            this.abq.add(t);
        }

        public void assertCompleted(String str) {
            if (isCompleted()) {
                return;
            }
            this.env.flop(str);
        }

        public void assertUncompleted(String str) {
            if (isCompleted()) {
                this.env.flop(str);
            }
        }

        public void expectCompletion(long j, String str) throws InterruptedException {
            if (isCompleted()) {
                return;
            }
            T poll = this.abq.poll(j, TimeUnit.MILLISECONDS);
            if (poll == null) {
                this.env.flop(String.format("%s within %d ms", str, Long.valueOf(j)));
            } else {
                this._value = poll;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reactivestreams/tck/TestEnvironment$Receptacle.class */
    public static class Receptacle<T> {
        private final TestEnvironment env;
        final int QUEUE_SIZE = 32;
        private ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<>(32);

        /* JADX INFO: Access modifiers changed from: package-private */
        public Receptacle(TestEnvironment testEnvironment) {
            this.env = testEnvironment;
        }

        public void add(T t) {
            this.abq.add(Optional.of(t));
        }

        public void complete() {
            this.abq.add(Optional.empty());
        }

        public T next(long j, String str) throws InterruptedException {
            Optional<T> poll = this.abq.poll(j, TimeUnit.MILLISECONDS);
            if (poll.isEmpty()) {
                this.env.flop("Expected element but got end-of-stream");
                return null;
            }
            if (poll.get() != null) {
                return poll.get();
            }
            this.env.flop(String.format("%s within %d ms", str, Long.valueOf(j)));
            return null;
        }

        public Optional<T> nextOrEndOfStream(long j, String str) throws InterruptedException {
            Optional<T> poll = this.abq.poll(j, TimeUnit.MILLISECONDS);
            if (poll.isDefined()) {
                return poll;
            }
            this.env.flop(String.format("%s within %d ms", str, Long.valueOf(j)));
            return null;
        }

        public List<T> nextN(int i, long j, String str) throws InterruptedException {
            LinkedList linkedList = new LinkedList();
            for (int i2 = i; i2 > 0; i2--) {
                linkedList.add(next(j, str));
            }
            return linkedList;
        }

        public void expectCompletion(long j, String str) throws InterruptedException {
            Optional<T> poll = this.abq.poll(j, TimeUnit.MILLISECONDS);
            if (poll.isEmpty()) {
                return;
            }
            if (poll.get() == null) {
                this.env.flop(String.format("%s within %d ms", str, Long.valueOf(j)));
            } else {
                this.env.flop("Expected end-of-stream but got " + poll.get());
            }
        }

        void expectNone(long j, String str) throws InterruptedException {
            Thread.sleep(j);
            Optional<T> poll = this.abq.poll();
            if (poll == null) {
                return;
            }
            if (poll.isDefined()) {
                this.env.flop(str + poll.get());
            } else {
                this.env.flop("Expected no element but got end-of-stream");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reactivestreams/tck/TestEnvironment$TestSubscriber.class */
    public static class TestSubscriber<T> implements Subscriber<T> {
        volatile Promise<Subscription> subscription;
        protected final TestEnvironment env;

        public TestSubscriber(TestEnvironment testEnvironment) {
            this.env = testEnvironment;
            this.subscription = new Promise<>(testEnvironment);
        }

        public void onError(Throwable th) {
            this.env.flop(String.format("Unexpected Subscriber::onError(%s)", th));
        }

        public void onComplete() {
            this.env.flop("Unexpected Subscriber::onComplete()");
        }

        public void onNext(T t) {
            this.env.flop(String.format("Unexpected Subscriber::onNext(%s)", t));
        }

        public void onSubscribe(Subscription subscription) {
            this.env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription));
        }

        public void cancel() {
            if (!this.subscription.isCompleted()) {
                this.env.flop("Cannot cancel a subscription before having received it");
            } else {
                this.subscription.value().cancel();
                this.subscription = new Promise<>(this.env);
            }
        }
    }

    public TestEnvironment(long j) {
        this.defaultTimeoutMillis = j;
    }

    public long defaultTimeoutMillis() {
        return this.defaultTimeoutMillis;
    }

    public void flop(String str) {
        try {
            Assert.fail(str);
        } catch (Throwable th) {
            this.asyncErrors.add(th);
            throw new RuntimeException(th);
        }
    }

    public <T extends Throwable> void expectThrowingOf(Class<T> cls, String str, Runnable runnable) throws Throwable {
        try {
            runnable.run();
            flop(str);
        } catch (Throwable th) {
            if (cls.isInstance(th)) {
                return;
            }
            if (!NonFatal.apply(th)) {
                throw th;
            }
            flop(str + " but " + th);
        }
    }

    public <T> void subscribe(Publisher<T> publisher, TestSubscriber<T> testSubscriber) throws InterruptedException {
        subscribe(publisher, testSubscriber, this.defaultTimeoutMillis);
    }

    public <T> void subscribe(Publisher<T> publisher, TestSubscriber<T> testSubscriber, long j) throws InterruptedException {
        publisher.subscribe(testSubscriber);
        testSubscriber.subscription.expectCompletion(j, String.format("Could not subscribe %s to Publisher %s", testSubscriber, publisher));
        verifyNoAsyncErrors();
    }

    public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> publisher) throws InterruptedException {
        return newManualSubscriber(publisher, defaultTimeoutMillis());
    }

    public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> publisher, long j) throws InterruptedException {
        ManualSubscriberWithSubscriptionSupport manualSubscriberWithSubscriptionSupport = new ManualSubscriberWithSubscriptionSupport(this);
        subscribe(publisher, manualSubscriberWithSubscriptionSupport, j);
        return manualSubscriberWithSubscriptionSupport;
    }

    public void verifyNoAsyncErrors() {
        Iterator<Throwable> it = this.asyncErrors.iterator();
        while (it.hasNext()) {
            Throwable next = it.next();
            if (next instanceof AssertionError) {
                throw ((AssertionError) next);
            }
            Assert.fail("Async error during test execution: " + next);
        }
    }
}
