package io.aeron.samples.stress;

import io.aeron.Aeron;
import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.logbuffer.Header;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.Random;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.SystemEpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.YieldingIdleStrategy;

/* loaded from: input_file:io/aeron/samples/stress/StressUnicastClient.class */
public class StressUnicastClient implements Agent {
    private static final long TIMEOUT_MS = 5000;
    private final String serverAddress;
    private final String clientAddress;
    private final EpochClock clock;
    private final MutableDirectBuffer msg;
    private final int maxInflight;
    private final int totalToSend;
    private final int mtu;
    private int messageLength;
    private Aeron aeron;
    private Publication unicastPublication;
    private Subscription unicastSubscription;
    private final Long2ObjectHashMap<Boolean> inflightMessages = new Long2ObjectHashMap<>();
    private final FragmentAssembler unicastRspAssembler = new FragmentAssembler(this::unicastRspHandler);
    private final byte[] buffer = new byte[131008];
    private final CRC64 crc = new CRC64();
    private long correlationId = 0;
    private long lastMessageSent = 0;

    public StressUnicastClient(String str, String str2, EpochClock epochClock, int i, int i2, int i3) {
        this.serverAddress = str;
        this.clientAddress = str2;
        this.clock = epochClock;
        this.maxInflight = i;
        this.totalToSend = i2;
        this.mtu = i3;
        new Random(42L).nextBytes(this.buffer);
        this.msg = new UnsafeBuffer(this.buffer);
    }

    public void onStart() {
        StressUtil.info("server=" + this.serverAddress + ", client=" + this.clientAddress);
        this.aeron = Aeron.connect(new Aeron.Context());
        StressUtil.info("Connected to Aeron dir=" + this.aeron.context().aeronDirectoryName());
        this.unicastPublication = this.aeron.addExclusivePublication(StressUtil.unicastReqChannel(this.serverAddress).mtu(Integer.valueOf(this.mtu)).linger(0L).build(), StressUtil.UNICAST_STREAM_ID);
        this.unicastSubscription = this.aeron.addSubscription(StressUtil.unicastRspChannel(this.clientAddress).build(), StressUtil.UNICAST_STREAM_ID);
        StressUtil.info("publications and subscriptions created");
    }

    public int doWork() {
        int i;
        if (0 == this.messageLength) {
            throw new IllegalStateException("messageLength has not been set");
        }
        int i2 = 0;
        while (this.inflightMessages.size() < this.maxInflight && this.correlationId < this.totalToSend && 0 < this.unicastPublication.offer(this.msg, 0, this.messageLength, this::currentCorrelationId)) {
            this.inflightMessages.put(this.correlationId, Boolean.TRUE);
            this.correlationId++;
            i2++;
            this.lastMessageSent = this.clock.time();
        }
        int i3 = 0;
        while (true) {
            i = i3;
            int poll = this.unicastSubscription.poll(this.unicastRspAssembler, this.maxInflight);
            if (0 == poll) {
                break;
            }
            i3 = i + poll;
        }
        if (0 < this.correlationId && 0 == i && !this.inflightMessages.isEmpty()) {
            long time = this.clock.time() - this.lastMessageSent;
            if (TIMEOUT_MS < time) {
                throw new RuntimeException("No response received for " + time + "ms, client=" + this);
            }
        }
        return i2;
    }

    private boolean isComplete() {
        return ((long) this.totalToSend) <= this.correlationId && this.inflightMessages.isEmpty();
    }

    private void unicastRspHandler(DirectBuffer directBuffer, int i, int i2, Header header) {
        long reservedValue = header.reservedValue();
        StressUtil.validateMessage(this.crc, directBuffer, i, i2, reservedValue);
        this.inflightMessages.remove(reservedValue);
    }

    private long currentCorrelationId(DirectBuffer directBuffer, int i, int i2) {
        return this.correlationId;
    }

    void reset(int i) {
        this.messageLength = i;
        this.correlationId = 0L;
        this.msg.putLong(0, this.crc.recalculate(this.buffer, 8, i - 8), ByteOrder.LITTLE_ENDIAN);
    }

    public void onClose() {
        CloseHelper.quietCloseAll(new AutoCloseable[]{this.aeron});
    }

    public String roleName() {
        return null;
    }

    public String toString() {
        return "StressClient{inflightMessages=" + this.inflightMessages + ", mtu=" + this.mtu + ", messageLength=" + this.messageLength + ", correlationId=" + this.correlationId + ", lastMessageSent=" + this.lastMessageSent + '}';
    }

    public static void main(String[] strArr) {
        YieldingIdleStrategy yieldingIdleStrategy = new YieldingIdleStrategy();
        Iterator<Integer> it = StressUtil.MTU_LENGTHS.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            StressUnicastClient stressUnicastClient = new StressUnicastClient(StressUtil.serverAddress(), StressUtil.clientAddress(), new SystemEpochClock(), 20, 100, intValue);
            stressUnicastClient.onStart();
            for (int i = 32; i <= 131008; i += 1024) {
                try {
                    stressUnicastClient.reset(i);
                    while (!stressUnicastClient.isComplete()) {
                        yieldingIdleStrategy.idle(stressUnicastClient.doWork());
                    }
                } catch (Throwable th) {
                    stressUnicastClient.onClose();
                    throw th;
                }
            }
            StressUtil.info("Complete mtu=" + intValue);
            stressUnicastClient.onClose();
        }
    }
}
