package io.atomix.cluster.messaging.impl;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/cluster/messaging/impl/NettyMessagingServiceTest.class */
public class NettyMessagingServiceTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyMessagingServiceTest.class);
    private static final String IP_STRING = "127.0.0.1";
    ManagedMessagingService netty1;
    ManagedMessagingService netty2;
    ManagedMessagingService nettyv11;
    ManagedMessagingService nettyv12;
    ManagedMessagingService nettyv21;
    ManagedMessagingService nettyv22;
    Address address1;
    Address address2;
    Address addressv11;
    Address addressv12;
    Address addressv21;
    Address addressv22;
    Address invalidAddress;
    private ManagedMessagingService messagingService;

    @Before
    public void setUp() throws Exception {
        this.address1 = Address.from(SocketUtil.getNextAddress().getPort());
        MessagingConfig shutdownQuietPeriod = new MessagingConfig().setShutdownQuietPeriod(Duration.ofMillis(50L));
        this.netty1 = (ManagedMessagingService) new NettyMessagingService("test", this.address1, shutdownQuietPeriod).start().join();
        this.address2 = Address.from(SocketUtil.getNextAddress().getPort());
        this.netty2 = (ManagedMessagingService) new NettyMessagingService("test", this.address2, shutdownQuietPeriod).start().join();
        this.addressv11 = Address.from(SocketUtil.getNextAddress().getPort());
        this.nettyv11 = (ManagedMessagingService) new NettyMessagingService("test", this.addressv11, shutdownQuietPeriod, ProtocolVersion.V1).start().join();
        this.addressv12 = Address.from(SocketUtil.getNextAddress().getPort());
        this.nettyv12 = (ManagedMessagingService) new NettyMessagingService("test", this.addressv12, shutdownQuietPeriod, ProtocolVersion.V1).start().join();
        this.addressv21 = Address.from(SocketUtil.getNextAddress().getPort());
        this.nettyv21 = (ManagedMessagingService) new NettyMessagingService("test", this.addressv21, shutdownQuietPeriod, ProtocolVersion.V2).start().join();
        this.addressv22 = Address.from(SocketUtil.getNextAddress().getPort());
        this.nettyv22 = (ManagedMessagingService) new NettyMessagingService("test", this.addressv22, shutdownQuietPeriod, ProtocolVersion.V2).start().join();
        this.invalidAddress = Address.from(IP_STRING, 5007);
    }

    private String nextSubject() {
        return UUID.randomUUID().toString();
    }

    @After
    public void tearDown() throws Exception {
        ((Stream) Stream.of((Object[]) new ManagedMessagingService[]{this.netty1, this.netty2, this.nettyv11, this.nettyv12, this.nettyv21, this.nettyv22, this.messagingService}).parallel()).filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(managedMessagingService -> {
            try {
                managedMessagingService.stop().join();
            } catch (Exception e) {
                LOGGER.warn("Failed stopping NettyMessagingService {}", managedMessagingService, e);
            }
        });
    }

    @Test
    public void testSendAsyncToUnresolvable() {
        Assert.assertTrue(this.netty1.sendAsync(Address.from("unknown.local", this.address1.port()), nextSubject(), "hello world".getBytes()).isCompletedExceptionally());
    }

    @Test
    public void testSendAsync() {
        String nextSubject = nextSubject();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.netty1.sendAsync(this.address2, nextSubject, "hello world".getBytes()).whenComplete((r3, th) -> {
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        Uninterruptibles.awaitUninterruptibly(countDownLatch);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.netty1.sendAsync(this.invalidAddress, nextSubject, "hello world".getBytes()).whenComplete((r32, th2) -> {
            Assert.assertNotNull(th2);
            Assert.assertTrue(th2 instanceof ConnectException);
            countDownLatch2.countDown();
        });
        Uninterruptibles.awaitUninterruptibly(countDownLatch2);
    }

    @Test
    public void testSendAndReceive() {
        String nextSubject = nextSubject();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        this.netty2.registerHandler(nextSubject, (address, bArr) -> {
            atomicBoolean.set(true);
            atomicReference2.set(address);
            atomicReference.set(bArr);
            return "hello there".getBytes();
        }, MoreExecutors.directExecutor());
        Assert.assertTrue(Arrays.equals("hello there".getBytes(), (byte[]) this.netty1.sendAndReceive(this.address2, nextSubject, "hello world".getBytes()).join()));
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(Arrays.equals((byte[]) atomicReference.get(), "hello world".getBytes()));
        Assert.assertEquals(this.address1.address(), ((Address) atomicReference2.get()).address());
    }

    @Test
    public void shouldCompleteExistingRequestFutureExceptionallyWhenMessagingServiceIsClosed() {
        CompletableFuture sendAndReceive = this.netty1.sendAndReceive(this.address2, nextSubject(), "hello world".getBytes(), true, Duration.ofSeconds(5L));
        this.netty1.stop().join();
        Assertions.assertThat(sendAndReceive).isCompletedExceptionally();
    }

    @Test
    public void shouldCompleteExistingRequestWithKeepAliveExceptionallyWhenMessagingServiceIsClosed() {
        CompletableFuture sendAndReceive = this.netty1.sendAndReceive(this.address2, nextSubject(), "hello world".getBytes(), true, Duration.ofSeconds(5L));
        this.netty1.stop().join();
        Assertions.assertThat(sendAndReceive).isCompletedExceptionally();
    }

    @Test
    public void shouldCompleteFutureExceptionallyIfMessagingServiceIsClosedInBetween() {
        String nextSubject = nextSubject();
        CompletableFuture stop = this.netty1.stop();
        CompletableFuture sendAndReceive = this.netty1.sendAndReceive(this.address2, nextSubject, "hello world".getBytes(), false, Duration.ofSeconds(5L));
        stop.join();
        Assertions.assertThat(sendAndReceive).isCompletedExceptionally();
    }

    @Test
    public void shouldCompleteRequestWithKeepAliveExceptionallyIfMessagingServiceIsClosedInBetween() {
        String nextSubject = nextSubject();
        CompletableFuture stop = this.netty1.stop();
        CompletableFuture sendAndReceive = this.netty1.sendAndReceive(this.address2, nextSubject, "hello world".getBytes(), true, Duration.ofSeconds(5L));
        stop.join();
        Assertions.assertThat(sendAndReceive).isCompletedExceptionally();
    }

    @Test
    public void shouldCompleteFutureExceptionallyIfMessagingServiceHasAlreadyClosed() {
        String nextSubject = nextSubject();
        this.netty1.stop().join();
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, nextSubject, "hello world".getBytes(), false, Duration.ofSeconds(5L))).isCompletedExceptionally();
    }

    @Test
    public void shouldCompleteRequestWithKeepAliveExceptionallyIfMessagingServiceHasAlreadyClosed() {
        String nextSubject = nextSubject();
        this.netty1.stop().join();
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, nextSubject, "hello world".getBytes(), true, Duration.ofSeconds(5L))).isCompletedExceptionally();
    }

    @Test
    public void testTransientSendAndReceive() {
        String nextSubject = nextSubject();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        this.netty2.registerHandler(nextSubject, (address, bArr) -> {
            atomicBoolean.set(true);
            atomicReference2.set(address);
            atomicReference.set(bArr);
            return "hello there".getBytes();
        }, MoreExecutors.directExecutor());
        Assert.assertTrue(Arrays.equals("hello there".getBytes(), (byte[]) this.netty1.sendAndReceive(this.address2, nextSubject, "hello world".getBytes(), false).join()));
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(Arrays.equals((byte[]) atomicReference.get(), "hello world".getBytes()));
        Assert.assertEquals(this.address1.address(), ((Address) atomicReference2.get()).address());
    }

    @Test
    public void testSendAndReceiveWithFixedTimeout() {
        String nextSubject = nextSubject();
        this.netty2.registerHandler(nextSubject, (address, bArr) -> {
            return new CompletableFuture();
        });
        try {
            this.netty1.sendAndReceive(this.address2, nextSubject, "hello world".getBytes(), Duration.ofSeconds(1L)).join();
            Assert.fail();
        } catch (CompletionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutException);
        }
    }

    @Test
    @Ignore
    public void testSendAndReceiveWithExecutor() {
        String nextSubject = nextSubject();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, "completion-thread");
        });
        ExecutorService newSingleThreadExecutor2 = Executors.newSingleThreadExecutor(runnable2 -> {
            return new Thread(runnable2, "handler-thread");
        });
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.netty2.registerHandler(nextSubject, (address, bArr) -> {
            atomicReference.set(Thread.currentThread().getName());
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Assert.fail("InterruptedException");
            }
            return "hello there".getBytes();
        }, newSingleThreadExecutor2);
        CompletableFuture sendAndReceive = this.netty1.sendAndReceive(this.address2, nextSubject, "hello world".getBytes(), newSingleThreadExecutor);
        sendAndReceive.whenComplete((bArr2, th) -> {
            atomicReference2.set(Thread.currentThread().getName());
        });
        countDownLatch.countDown();
        Assert.assertTrue(Arrays.equals("hello there".getBytes(), (byte[]) sendAndReceive.join()));
        Assert.assertEquals("completion-thread", atomicReference2.get());
        Assert.assertEquals("handler-thread", atomicReference.get());
    }

    @Test
    public void testV1() throws Exception {
        byte[] bytes = "Hello world!".getBytes();
        String nextSubject = nextSubject();
        this.nettyv11.registerHandler(nextSubject, (address, bArr) -> {
            return CompletableFuture.completedFuture(bArr);
        });
        Assert.assertArrayEquals(bytes, (byte[]) this.nettyv12.sendAndReceive(this.addressv11, nextSubject, bytes).get(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testV2() throws Exception {
        byte[] bytes = "Hello world!".getBytes();
        String nextSubject = nextSubject();
        this.nettyv21.registerHandler(nextSubject, (address, bArr) -> {
            return CompletableFuture.completedFuture(bArr);
        });
        Assert.assertArrayEquals(bytes, (byte[]) this.nettyv22.sendAndReceive(this.addressv21, nextSubject, bytes).get(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testVersionNegotiation() throws Exception {
        byte[] bytes = "Hello world!".getBytes();
        String nextSubject = nextSubject();
        this.nettyv11.registerHandler(nextSubject, (address, bArr) -> {
            return CompletableFuture.completedFuture(bArr);
        });
        Assert.assertArrayEquals(bytes, (byte[]) this.nettyv21.sendAndReceive(this.addressv11, nextSubject, bytes).get(10L, TimeUnit.SECONDS));
        String nextSubject2 = nextSubject();
        this.nettyv22.registerHandler(nextSubject2, (address2, bArr2) -> {
            return CompletableFuture.completedFuture(bArr2);
        });
        Assert.assertArrayEquals(bytes, (byte[]) this.nettyv12.sendAndReceive(this.addressv22, nextSubject2, bytes).get(10L, TimeUnit.SECONDS));
    }

    @Test
    public void shouldNotBindToAdvertisedAddress() {
        Address from = Address.from(SocketUtil.getNextAddress().getPort());
        MessagingConfig messagingConfig = new MessagingConfig();
        messagingConfig.setInterfaces(List.of(from.host()));
        messagingConfig.setPort(Integer.valueOf(from.port()));
        Address address = new Address("invalid.host", 1);
        this.messagingService = (ManagedMessagingService) new NettyMessagingService("test", address, messagingConfig).start().join();
        Assertions.assertThat(this.messagingService.bindingAddresses()).contains(new Address[]{from});
        Assertions.assertThat(this.messagingService.address()).isEqualTo(address);
    }

    @Test
    public void testRemoteHandlerFailure() {
        MessagingException.RemoteHandlerFailure remoteHandlerFailure = new MessagingException.RemoteHandlerFailure("foo bar");
        BiFunction biFunction = (address, bArr) -> {
            throw new RuntimeException("foo bar");
        };
        String nextSubject = nextSubject();
        this.netty2.registerHandler(nextSubject, biFunction, MoreExecutors.directExecutor());
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, nextSubject, "fail".getBytes())).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class).withMessage(remoteHandlerFailure.getMessage());
    }

    @Test
    public void testRemoteHandlerFailureNullValue() {
        MessagingException.RemoteHandlerFailure remoteHandlerFailure = new MessagingException.RemoteHandlerFailure((String) null);
        BiFunction biFunction = (address, bArr) -> {
            throw new RuntimeException();
        };
        String nextSubject = nextSubject();
        this.netty2.registerHandler(nextSubject, biFunction, MoreExecutors.directExecutor());
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, nextSubject, "fail".getBytes())).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class).withMessage(remoteHandlerFailure.getMessage());
    }

    @Test
    public void testRemoteHandlerFailureEmptyStringValue() {
        MessagingException.RemoteHandlerFailure remoteHandlerFailure = new MessagingException.RemoteHandlerFailure((String) null);
        BiFunction biFunction = (address, bArr) -> {
            throw new RuntimeException("");
        };
        String nextSubject = nextSubject();
        this.netty2.registerHandler(nextSubject, biFunction, MoreExecutors.directExecutor());
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, nextSubject, "fail".getBytes())).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class).withMessage(remoteHandlerFailure.getMessage());
    }

    @Test
    public void testCompletableRemoteHandlerFailure() {
        MessagingException.RemoteHandlerFailure remoteHandlerFailure = new MessagingException.RemoteHandlerFailure("foo bar");
        String nextSubject = nextSubject();
        this.netty2.registerHandler(nextSubject, (address, bArr) -> {
            return CompletableFuture.failedFuture(new RuntimeException("foo bar"));
        });
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, nextSubject, "fail".getBytes())).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class).withMessage(remoteHandlerFailure.getMessage());
    }

    @Test
    public void testCompletableRemoteHandlerFailureNullValue() {
        MessagingException.RemoteHandlerFailure remoteHandlerFailure = new MessagingException.RemoteHandlerFailure((String) null);
        String nextSubject = nextSubject();
        this.netty2.registerHandler(nextSubject, (address, bArr) -> {
            return CompletableFuture.failedFuture(new RuntimeException());
        });
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, nextSubject, "fail".getBytes())).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class).withMessage(remoteHandlerFailure.getMessage());
    }

    @Test
    public void testCompletableRemoteHandlerFailureEmptyStringValue() {
        MessagingException.RemoteHandlerFailure remoteHandlerFailure = new MessagingException.RemoteHandlerFailure((String) null);
        String nextSubject = nextSubject();
        this.netty2.registerHandler(nextSubject, (address, bArr) -> {
            return CompletableFuture.failedFuture(new RuntimeException(""));
        });
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, nextSubject, "fail".getBytes())).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.RemoteHandlerFailure.class).withMessage(remoteHandlerFailure.getMessage());
    }

    @Test
    public void testNoRemoteHandlerException() {
        String nextSubject = nextSubject();
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, nextSubject, "fail".getBytes())).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.NoRemoteHandler.class).withMessage(new MessagingException.NoRemoteHandler(nextSubject).getMessage());
    }

    @Test
    public void testNoRemoteHandlerExceptionEmptyStringValue() {
        Assertions.assertThat(this.netty1.sendAndReceive(this.address2, "", "fail".getBytes())).failsWithin(Duration.ofSeconds(5L)).withThrowableOfType(ExecutionException.class).havingRootCause().isInstanceOf(MessagingException.NoRemoteHandler.class).withMessage(new MessagingException.NoRemoteHandler((String) null).getMessage());
    }
}
