package io.vertx.amqp;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.proton.ProtonDelivery;
import java.util.Arrays;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.message.Message;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.After;
import org.junit.Test;

/* loaded from: input_file:io/vertx/amqp/ReceiverTest.class */
public class ReceiverTest extends BareTestBase {
    private MockServer server;

    @Override // io.vertx.amqp.BareTestBase
    @After
    public void tearDown() throws InterruptedException {
        super.tearDown();
        if (this.server != null) {
            this.server.close();
        }
    }

    @Test(timeout = 10000)
    public void testReceiveMessageWithApplicationProperties(TestContext testContext) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        String str2 = "appPropKey";
        String str3 = "appPropValue";
        Async async = testContext.async();
        Async async2 = testContext.async();
        this.server = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.closeHandler(asyncResult2 -> {
                    protonSession.close();
                });
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                Message create = Message.Factory.create();
                create.setBody(new AmqpValue(str));
                HashMap hashMap = new HashMap();
                hashMap.put(str2, str3);
                create.setApplicationProperties(new ApplicationProperties(hashMap));
                protonSender.open();
                protonSender.send(create, protonDelivery -> {
                    testContext.assertNotNull(protonDelivery.getRemoteState(), "message had no remote state");
                    testContext.assertTrue(protonDelivery.getRemoteState() instanceof Accepted, "message was not accepted");
                    testContext.assertTrue(protonDelivery.remotelySettled(), "message was not settled");
                    async2.complete();
                });
            });
        });
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort()));
        this.client.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createReceiver(methodName, asyncResult -> {
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    testContext.assertNotNull(amqpMessage, "message was null");
                    testContext.assertNotNull(amqpMessage.bodyAsString(), "amqp message body content was null");
                    testContext.assertEquals(str, amqpMessage.bodyAsString(), "amqp message body was not as expected");
                    testContext.assertTrue(amqpMessage.applicationProperties() != null, "application properties element not present");
                    JsonObject applicationProperties = amqpMessage.applicationProperties();
                    testContext.assertTrue(applicationProperties.containsKey(str2), "expected property key element not present");
                    testContext.assertEquals(str3, applicationProperties.getValue(str2), "app property value not as expected");
                    testContext.assertEquals(1, Integer.valueOf(applicationProperties.size()), "unexpected app properties");
                    async.complete();
                });
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }

    @Test(timeout = 10000)
    public void testReceptionWithAutoAccept(TestContext testContext) throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        this.server = setupMockServer(testContext, 10, (protonDelivery, num) -> {
            testContext.assertEquals(protonDelivery.getRemoteState().getClass(), Accepted.class, "state was not accepted");
            copyOnWriteArrayList.add(num);
            countDownLatch.countDown();
        });
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    copyOnWriteArrayList2.add(amqpMessage.bodyAsString());
                });
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList2).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Test(timeout = 10000)
    public void testReceptionWithManuallyAcceptedMessages(TestContext testContext) throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        this.server = setupMockServer(testContext, 10, (protonDelivery, num) -> {
            testContext.assertEquals(protonDelivery.getRemoteState().getClass(), Accepted.class, "state was not accepted");
            copyOnWriteArrayList.add(num);
            countDownLatch.countDown();
        });
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, new AmqpReceiverOptions().setAutoAcknowledgement(false), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    copyOnWriteArrayList2.add(amqpMessage.bodyAsString());
                    amqpMessage.accepted();
                });
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList2).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Test(timeout = 10000)
    public void testReceptionWithManuallyRejectedMessages(TestContext testContext) throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        this.server = setupMockServer(testContext, 10, (protonDelivery, num) -> {
            testContext.assertEquals(protonDelivery.getRemoteState().getClass(), Rejected.class, "state was not rejected");
            copyOnWriteArrayList.add(num);
            countDownLatch.countDown();
        });
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, new AmqpReceiverOptions().setAutoAcknowledgement(false), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    copyOnWriteArrayList2.add(amqpMessage.bodyAsString());
                    amqpMessage.rejected();
                });
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList2).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Test(timeout = 10000)
    public void testReceptionWithManuallyModifiedFailedMessages(TestContext testContext) throws Exception {
        doReceptionWithManuallyModifiedMessagesTestImpl(testContext, false);
    }

    @Test(timeout = 10000)
    public void testReceptionWithManuallyModifiedFailedUndeliverableHereMessages(TestContext testContext) throws Exception {
        doReceptionWithManuallyModifiedMessagesTestImpl(testContext, true);
    }

    private void doReceptionWithManuallyModifiedMessagesTestImpl(TestContext testContext, boolean z) throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        this.server = setupMockServer(testContext, 10, (protonDelivery, num) -> {
            Modified remoteState = protonDelivery.getRemoteState();
            testContext.assertEquals(remoteState.getClass(), Modified.class, "state was not modified");
            testContext.assertTrue(remoteState.getDeliveryFailed().booleanValue());
            testContext.assertEquals(Boolean.valueOf(z), remoteState.getUndeliverableHere());
            copyOnWriteArrayList.add(num);
            countDownLatch.countDown();
        });
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, new AmqpReceiverOptions().setAutoAcknowledgement(false), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    copyOnWriteArrayList2.add(amqpMessage.bodyAsString());
                    amqpMessage.modified(true, z);
                });
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList2).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Test(timeout = 10000)
    public void testReceptionCreatingReceiverWithoutConnection(TestContext testContext) throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        this.server = setupMockServer(testContext, 10, (protonDelivery, num) -> {
            testContext.assertEquals(protonDelivery.getRemoteState().getClass(), Accepted.class, "state was not accepted");
            copyOnWriteArrayList.add(num);
            countDownLatch.countDown();
        });
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).createReceiver(uuid, asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                copyOnWriteArrayList2.add(amqpMessage.bodyAsString());
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList2).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    private MockServer setupMockServer(TestContext testContext, int i, BiConsumer<ProtonDelivery, Integer> biConsumer) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        return new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.closeHandler(asyncResult2 -> {
                    protonSession.close();
                });
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                protonSender.sendQueueDrainHandler(protonSender -> {
                    while (atomicInteger.get() < i && !protonSender.sendQueueFull()) {
                        Message message = Proton.message();
                        int andIncrement = atomicInteger.getAndIncrement();
                        message.setBody(new AmqpValue(String.valueOf(andIncrement)));
                        protonSender.send(message, protonDelivery -> {
                            testContext.assertNotNull(protonDelivery.getRemoteState(), "message had no state set");
                            biConsumer.accept(protonDelivery, Integer.valueOf(andIncrement));
                            testContext.assertTrue(protonDelivery.remotelySettled(), "message was not settled");
                        });
                    }
                });
                protonSender.open();
            });
        });
    }

    @Test(timeout = 15000)
    public void testReceptionWhenDemandChangesWhileHandlingMessages(TestContext testContext) throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(2000);
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        Promise promise = Promise.promise();
        Future future = promise.future();
        this.server = setupMockServer(testContext, 2000, (protonDelivery, num) -> {
            testContext.assertEquals(protonDelivery.getRemoteState().getClass(), Accepted.class, "state was not accepted");
            copyOnWriteArrayList.add(num);
            countDownLatch.countDown();
        });
        this.client = AmqpClient.create(new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).connect(asyncResult -> {
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                AmqpReceiver amqpReceiver = (AmqpReceiver) asyncResult.result();
                amqpReceiver.pause();
                amqpReceiver.handler(amqpMessage -> {
                    copyOnWriteArrayList2.add(amqpMessage.bodyAsString());
                });
                promise.complete(amqpReceiver);
            });
        });
        ConditionFactory await = Awaitility.await();
        future.getClass();
        await.until(future::succeeded);
        AmqpReceiver amqpReceiver = (AmqpReceiver) future.result();
        amqpReceiver.fetch(400L);
        Awaitility.await().pollInterval(20L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() == 400);
        });
        amqpReceiver.fetch(1600L);
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList2).containsAll((Iterable) IntStream.range(0, 2000).mapToObj(String::valueOf).collect(Collectors.toList()));
        Assertions.assertThat(copyOnWriteArrayList).containsAll((Iterable) IntStream.range(0, 2000).boxed().collect(Collectors.toList()));
    }

    @Test(timeout = 20000)
    public void testReceiveMultipleMessageAfterDelayedHandlerAddition(TestContext testContext) throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        this.server = setupMockServer(testContext, 5, (protonDelivery, num) -> {
            testContext.assertEquals(protonDelivery.getRemoteState().getClass(), Accepted.class, "state was not accepted");
            copyOnWriteArrayList2.add(num);
            countDownLatch.countDown();
        });
        AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createReceiver(this.name.getMethodName(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                this.vertx.setTimer(250L, l -> {
                    ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                        copyOnWriteArrayList.add(amqpMessage.bodyAsString());
                    });
                });
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new String[]{"0", "1", "2", "3", "4"});
        Assertions.assertThat(copyOnWriteArrayList2).containsExactly(new Integer[]{0, 1, 2, 3, 4});
    }

    @Test(timeout = 20000)
    public void testReceiveMultipleMessageAfterPause(TestContext testContext) throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        this.server = setupMockServer(testContext, 5, (protonDelivery, num) -> {
            testContext.assertEquals(protonDelivery.getRemoteState().getClass(), Accepted.class, "state was not accepted");
            copyOnWriteArrayList2.add(num);
            countDownLatch.countDown();
        });
        AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(this.server.actualPort())).connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicLong atomicLong = new AtomicLong();
            ((AmqpConnection) asyncResult.result()).createReceiver(this.name.getMethodName(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                AmqpReceiver amqpReceiver = (AmqpReceiver) asyncResult.result();
                amqpReceiver.handler(amqpMessage -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    String bodyAsString = amqpMessage.bodyAsString();
                    testContext.assertNotNull(bodyAsString, "amqp message " + andIncrement + " body content was null");
                    testContext.assertEquals(String.valueOf(andIncrement), bodyAsString, "amqp message " + andIncrement + " body not as expected");
                    copyOnWriteArrayList.add(bodyAsString);
                    if (andIncrement == 2) {
                        amqpReceiver.pause();
                        atomicLong.set(System.currentTimeMillis());
                        this.vertx.setTimer(250L, l -> {
                            amqpReceiver.resume();
                        });
                    }
                    if (andIncrement > 2) {
                        testContext.assertTrue(atomicLong.get() > 0, "pause start not initialised before receiving msg" + andIncrement);
                        testContext.assertTrue(System.currentTimeMillis() >= atomicLong.get() + 250, "delivery occurred before expected");
                    }
                });
            });
        });
        Assertions.assertThat(countDownLatch.await(6L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new String[]{"0", "1", "2", "3", "4"});
        Assertions.assertThat(copyOnWriteArrayList2).containsExactly(new Integer[]{0, 1, 2, 3, 4});
    }

    @Test(timeout = 10000)
    public void testNonDurable(TestContext testContext) throws ExecutionException, InterruptedException {
        doDurableReceiverTestImpl(testContext, false, "not-normally-configured-unless-shared-sub", null);
    }

    @Test(timeout = 10000)
    public void testNonDurableReceiverWithAddedSourceCapability(TestContext testContext) throws ExecutionException, InterruptedException {
        doDurableReceiverTestImpl(testContext, false, "my-subscription-name", "shared");
    }

    @Test(timeout = 10000)
    public void testDurableSubscriptionReciever(TestContext testContext) throws ExecutionException, InterruptedException {
        doDurableReceiverTestImpl(testContext, true, "my-durable-subscription-name", null);
    }

    @Test(timeout = 10000)
    public void testDurableReceiverWithAddedSourceCapability(TestContext testContext) throws ExecutionException, InterruptedException {
        doDurableReceiverTestImpl(testContext, true, "my-durable-subscription-name", "shared");
    }

    private void doDurableReceiverTestImpl(TestContext testContext, boolean z, String str, String str2) throws InterruptedException, ExecutionException {
        Async async = testContext.async();
        Async async2 = testContext.async();
        this.server = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                protonSender.closeHandler(asyncResult2 -> {
                    testContext.assertFalse(z, "unexpected link close for durable sub");
                    protonSender.close();
                });
                protonSender.detachHandler(asyncResult3 -> {
                    testContext.assertTrue(z, "unexpected link detach for non-durable sub");
                    protonSender.detach();
                });
                protonSender.open();
                testContext.assertEquals(str, protonSender.getName(), "unexpected link name");
                testContext.assertNotNull(protonSender.getRemoteSource(), "source should not be null");
                Source remoteSource = protonSender.getRemoteSource();
                if (z) {
                    testContext.assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy(), "unexpected expiry");
                    testContext.assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable(), "unexpected durability");
                }
                if (str2 != null) {
                    Symbol[] symbolArr = {Symbol.valueOf(str2)};
                    Symbol[] capabilities = remoteSource.getCapabilities();
                    testContext.assertTrue(Arrays.equals(symbolArr, capabilities), "Unexpected capabilities: " + Arrays.toString(capabilities));
                }
                async2.complete();
            });
        });
        AmqpClient.create(this.vertx, new AmqpClientOptions().setPort(this.server.actualPort()).setHost("localhost")).connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            AmqpConnection amqpConnection = (AmqpConnection) asyncResult.result();
            AmqpReceiverOptions amqpReceiverOptions = new AmqpReceiverOptions();
            amqpReceiverOptions.setLinkName(str);
            if (z) {
                amqpReceiverOptions.setDurable(true);
            }
            if (str2 != null) {
                amqpReceiverOptions.addCapability(str2);
            }
            amqpConnection.createReceiver("myAddress", amqpReceiverOptions, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                async.complete();
            });
        });
        async2.awaitSuccess();
        async.awaitSuccess();
    }
}
